"""
Optimized core routines for the 1D Lattice Weyl-Wigner / Wigner-Poisson model.
Implementation notes:
- solve the Wigner linear system directly with LAPACK band storage
- avoid converting band matrices to dense Nx*N by Nx*N arrays
- vectorize Fermi boundaries, current/density, Poisson charge sums, and potential coefficients
- cache kinetic stencil and sine matrix used in the non-local potential
"""
from __future__ import annotations
from dataclasses import dataclass, replace
from functools import lru_cache
import math
from typing import Optional
import numpy as np
import scipy.linalg as spla
from ._native_kernels import (
CPP_AVAILABLE,
NUMBA_AVAILABLE,
OPENMP_ENABLED,
OPENMP_THREADS,
curcalc_current as _native_curcalc_current,
curcalc_density as _native_curcalc_density,
fill_potential_banded as _native_fill_potential_banded,
fill_scattering_banded as _native_fill_scattering_banded,
resolve_backend,
)
[docs]
@dataclass(frozen=True)
class Params:
# Grid
Nx: int = 86
N: int = 72
# Physical constants
c: float = 2997.924580
rmo: float = 0.05685675170
rm: float = 6.67e-2
temp: float = 77.0
rhc: float = 1973.29
re: float = 2.0
rno: float = 1.0e-6
ri: float = 0.3
# Geometry
box: float = 550.0
well: float = 50.0
barrier: float = 30.0
spacer: float = 30.0
pot: float = 0.3
# Other
chemp: float = 0.0863814
@property
def rmass(self) -> float:
return self.rm * self.rmo * (self.c**2)
@property
def bbeta(self) -> float:
return 11604.5 / self.temp
@property
def dens(self) -> float:
return self.re * self.rno
@property
def densi(self) -> float:
return (1.0 + self.ri) * self.re * self.rno / (1.0 - self.ri)
@property
def delx(self) -> float:
return self.box / (self.Nx - 1)
@property
def delk(self) -> float:
return math.pi / (self.delx * self.N)
@property
def x(self) -> np.ndarray:
return np.linspace(0.0, self.box, self.Nx)
DEFAULT = Params()
# Backwards-compatible module-level constants.
Nx = DEFAULT.Nx
N = DEFAULT.N
c = DEFAULT.c
rmo = DEFAULT.rmo
rm = DEFAULT.rm
temp = DEFAULT.temp
rmass = DEFAULT.rmass
bbeta = DEFAULT.bbeta
rhc = DEFAULT.rhc
re = DEFAULT.re
rno = DEFAULT.rno
ri = DEFAULT.ri
dens = DEFAULT.dens
densi = DEFAULT.densi
box = DEFAULT.box
well = DEFAULT.well
barrier = DEFAULT.barrier
spacer = DEFAULT.spacer
pot = DEFAULT.pot
delx = DEFAULT.delx
delk = DEFAULT.delk
pi = math.pi
chemp = DEFAULT.chemp
x = DEFAULT.x
def _p(params: Optional[Params]) -> Params:
return DEFAULT if params is None else params
def _kernel_backend(use_numba: bool | None = True, kernel_backend: str = "auto") -> str:
return resolve_backend(kernel_backend, bool(use_numba))
[docs]
@dataclass(slots=True)
class WignerSolveWorkspace:
"""Reusable buffers for the Wigner banded solve.
The LAPACK buffer uses ``dgbsv`` storage directly. Its lower view has the
same shape as SciPy's compact band format, so the existing assembly kernels
can fill it while LAPACK can factor the full buffer without another matrix
allocation/copy inside ``solve_banded``.
"""
n: int
nx: int
lower_upper: int
lapack_ab: np.ndarray
rhs: np.ndarray
rvs_work: np.ndarray
[docs]
@classmethod
def create(cls, n: int = N, nx: int = Nx) -> "WignerSolveWorkspace":
lower_upper = 2 * n
size = n * nx
return cls(
n=n,
nx=nx,
lower_upper=lower_upper,
lapack_ab=np.zeros((1 + 3 * lower_upper, size), dtype=float, order="F"),
rhs=np.empty((size, 1), dtype=float, order="F"),
rvs_work=np.empty(nx, dtype=float),
)
@property
def band(self) -> np.ndarray:
return self.lapack_ab[self.lower_upper:, :]
[docs]
def band_to_agb(a: np.ndarray, params: Optional[Params] = None) -> np.ndarray:
"""Convert dense square matrix to LAPACK/SciPy general band storage."""
p = _p(params)
m = n = p.N * p.Nx
ml = mu = 2 * p.N
agb = np.zeros((ml + mu + 1, n), dtype=a.dtype)
for j in range(n):
i0 = max(0, j - mu)
i1 = min(m, j + ml + 1)
rows = mu + np.arange(i0, i1) - j
agb[rows, j] = a[i0:i1, j]
return agb
[docs]
def agb_to_band(agb: np.ndarray, params: Optional[Params] = None) -> np.ndarray:
"""Convert LAPACK/SciPy general band storage to a dense square matrix."""
p = _p(params)
m = n = p.N * p.Nx
ml = mu = 2 * p.N
a = np.zeros((m, n), dtype=agb.dtype)
for j in range(n):
i0 = max(0, j - mu)
i1 = min(m, j + ml + 1)
rows = mu + np.arange(i0, i1) - j
a[i0:i1, j] = agb[rows, j]
return a
def fermi_function(n: int = N, chemp_value: float = chemp, params: Optional[Params] = None) -> np.ndarray:
p = _p(params)
j = np.arange(1, n + 1, dtype=float)
deno = p.rmass / (math.pi * p.bbeta * (p.rhc**2))
rhmc = (p.rhc * p.delk) ** 2 / (8.0 * p.rmass)
arg = p.bbeta * (chemp_value - rhmc * (2.0 * j - n - 1.0) ** 2)
# logaddexp is stable for large positive/negative arguments.
return deno * np.logaddexp(0.0, arg)
def fbndry(b: np.ndarray, n: int = N, nx: int = Nx, chemp_value: float = chemp,
params: Optional[Params] = None) -> np.ndarray:
"""Fill boundary source vector in-place and return it."""
p = _p(params)
b.fill(0.0)
cte = p.delk * (p.rhc**2) / (4.0 * p.delx * p.rmass)
nh = n // 2
f = fermi_function(n, chemp_value, p)
j = np.arange(1, n + 1)
ram = cte * (2 * j - n - 1)
pos = np.arange(nh, n) # 0-based j indices for k > 0
neg = np.arange(0, nh) # 0-based j indices for k < 0
b[pos] = -3.0 * ram[pos] * f[pos]
b[n + pos] = ram[pos] * f[pos]
b[nx * n - n + neg] = 3.0 * ram[neg] * f[neg]
b[nx * n - 2 * n + neg] = -ram[neg] * f[neg]
return b
def rvscalc(rvs: np.ndarray, nx: int = Nx, bias: float = 0.0, isc: int = 0,
params: Optional[Params] = None) -> np.ndarray:
"""Calculate external double-barrier potential in-place and return it."""
p = _p(params)
r1 = (p.box - p.well) / 2.0 - p.barrier
r2 = (p.box - p.well) / 2.0
ra = r1 - p.spacer
ira = int(ra / p.delx) + 1
nxh = nx // 2
if isc == 0:
rvs[:nx] = 0.0
elif isc == 2:
idx = np.arange(nxh)
rx = idx * p.delx
vals = 0.18 * p.pot / (1.0 + np.exp((r1 - 50.0 - rx) / 28.0))
rvs[idx] = vals
rvs[nx - 1 - idx] = vals
idx = np.arange(max(ira - 1, 0), nxh)
rx = idx * p.delx
mask = (rx <= r2) & (rx >= r1)
idxb = idx[mask]
rvs[idxb] += p.pot
rvs[nx - 1 - idxb] += p.pot
if isc == 0:
rvs[nx - ira:nx] -= bias
slope = bias / (p.well + 4.0 * p.barrier)
idx = np.arange(ira, nxh)
rx = idx * p.delx - ra
mask = rx >= 0.0
idx = idx[mask]
rx = rx[mask]
rvs[idx] -= slope * rx
rvs[nx - 1 - idx] += slope * (rx - p.well - 4.0 * p.barrier)
return rvs
[docs]
def integral(x_grid: np.ndarray, y: np.ndarray, axis: int = 0) -> np.ndarray:
dx = x_grid[1] - x_grid[0]
return np.sum(y * dx, axis=axis)
[docs]
def get_exchange(rho: np.ndarray, x_grid: Optional[np.ndarray] = None) -> np.ndarray:
rho_safe = np.maximum(rho, 0.0)
return -((3.0 / math.pi) ** (1.0 / 3.0)) * np.cbrt(rho_safe)
[docs]
def get_hartree(rho: np.ndarray, x_grid: np.ndarray, eps: float = 1e-1) -> np.ndarray:
dx = x_grid[1] - x_grid[0]
return np.sum(rho[None, :] * dx / np.sqrt((x_grid[None, :] - x_grid[:, None]) ** 2 + eps), axis=-1)
@lru_cache(maxsize=16)
def _sin_matrix(n: int) -> np.ndarray:
j = np.arange(1, n)[:, None]
ip = np.arange(1, n // 2 + 1)[None, :]
return np.sin((2.0 * math.pi / n) * np.mod(ip * j, n))
[docs]
def aicalc(rvs: np.ndarray, ai: Optional[np.ndarray] = None, n: int = N, nx: int = Nx, i: int = 1,
params: Optional[Params] = None) -> np.ndarray:
"""Vectorized non-local potential coefficient for one 1-based spatial index i."""
i0 = i - 1
ips = np.arange(1, n // 2 + 1)
right = np.minimum(i0 + ips, nx - 1)
left = np.maximum(i0 - ips, 0)
diff = rvs[right] - rvs[left]
out = np.zeros(n) if ai is None else ai
out.fill(0.0)
out[: n - 1] = (2.0 / n) * (_sin_matrix(n) @ diff)
return out
def _potential_rvs(
external_rvs: np.ndarray,
rho: np.ndarray,
exchange: bool,
params: Params,
rvs_work: Optional[np.ndarray] = None,
) -> np.ndarray:
source = np.asarray(external_rvs, dtype=float)
if not exchange:
return source
if rvs_work is None:
rvs = source.copy()
else:
if rvs_work.shape != source.shape:
raise ValueError(f"rvs workspace must have shape {source.shape}")
np.copyto(rvs_work, source)
rvs = rvs_work
rvs += get_exchange(np.asarray(rho, dtype=float), params.x)
return rvs
def _fill_potential_banded_inplace(
amx: np.ndarray,
external_rvs: np.ndarray,
rho: np.ndarray,
n: int = N,
nx: int = Nx,
exchange: bool = False,
params: Optional[Params] = None,
use_numba: bool | None = True,
kernel_backend: str = "auto",
rvs_work: Optional[np.ndarray] = None,
) -> np.ndarray:
p = _p(params)
rvs = _potential_rvs(external_rvs, rho, exchange, p, rvs_work)
backend = _kernel_backend(use_numba, kernel_backend)
if _native_fill_potential_banded(amx, rvs, _sin_matrix(n), n, nx, backend):
return amx
center = 2 * n
ai = np.empty(n)
for ix in range(nx):
aicalc(rvs, ai, n, nx, ix + 1, p)
base = ix * n
for j0 in range(n - 1):
val = ai[j0]
if val == 0.0:
continue
# Original 1-based indexing converted to 0-based rows/columns.
amx[center + (j0 + 1), base: base + n - (j0 + 1)] = -val
amx[center - (j0 + 1), base + (j0 + 1): base + n] = val
return amx
[docs]
def potential(external_rvs: np.ndarray, rho: np.ndarray, n: int = N, nx: int = Nx,
exchange: bool = False, params: Optional[Params] = None,
use_numba: bool | None = True, kernel_backend: str = "auto") -> np.ndarray:
"""Return potential matrix directly in banded storage: shape (4*n+1, n*nx)."""
amx = np.zeros((4 * n + 1, n * nx), dtype=float)
return _fill_potential_banded_inplace(
amx,
external_rvs,
rho,
n,
nx,
exchange,
params,
use_numba,
kernel_backend,
)
@lru_cache(maxsize=16)
def _kinetic_cached(n: int, nx: int, irlx: int, delk_: float, delx_: float,
rhc_: float, rmass_: float) -> np.ndarray:
T = np.zeros((4 * n + 1, n * nx), dtype=float)
cte = delk_ * (rhc_**2) / (4.0 * delx_ * rmass_)
cols = np.arange(n * nx)
j0 = cols % n
j1 = j0 + 1
ram = cte * (2 * j1 - n - 1)
nh = n // 2
neg = j0 < nh
T[4 * n, neg] = 0.0
T[3 * n, neg] = 0.0
T[2 * n, neg] = 3.0 * ram[neg]
T[1 * n, neg] = -4.0 * ram[neg]
T[0, neg] = ram[neg]
pos = ~neg
T[4 * n, pos] = -ram[pos]
T[3 * n, pos] = 4.0 * ram[pos]
T[2 * n, pos] = -3.0 * ram[pos]
T[1 * n, pos] = 0.0
T[0, pos] = 0.0
if irlx >= 3:
T[2 * n, :] += -2.0 * 0.6582186935
return T
def kinetic(n: int = N, nx: int = Nx, irlx: int = 0, params: Optional[Params] = None) -> np.ndarray:
p = _p(params)
return _kinetic_cached(n, nx, irlx, p.delk, p.delx, p.rhc, p.rmass).copy()
[docs]
def arelax(S: np.ndarray, b: np.ndarray, n: int = N, nx: int = Nx,
params: Optional[Params] = None, use_numba: bool | None = True,
kernel_backend: str = "auto") -> np.ndarray:
"""Relaxation/scattering band matrix. No debug printing."""
tau = 0.5255074e3
tcol = 0.65821869340 / tau
backend = _kernel_backend(use_numba, kernel_backend)
if _native_fill_scattering_banded(S, np.ascontiguousarray(b, dtype=float), n, nx, tcol, backend):
return S
b2 = np.asarray(b).reshape(nx, n)
rho = b2.sum(axis=1)
center = 2 * n
with np.errstate(divide="ignore", invalid="ignore"):
weights = np.where(rho[:, None] != 0.0, tcol * b2 / rho[:, None], 0.0)
for ix in range(nx):
base = ix * n
for j in range(n):
# Add column-wise equilibrium redistribution.
for jp in range(n):
S[center + j - jp, base + jp] += weights[ix, j]
S[center, base + j] -= tcol
return S
[docs]
def scattering(b: np.ndarray, n: int = N, nx: int = Nx, params: Optional[Params] = None,
use_numba: bool | None = True, kernel_backend: str = "auto") -> np.ndarray:
return arelax(np.zeros((4 * n + 1, n * nx), dtype=float), b, n, nx, params, use_numba, kernel_backend)
def _assemble_wigner_banded_inplace(
out: np.ndarray,
bo: np.ndarray,
rvs: np.ndarray,
rho: np.ndarray,
n: int = N,
nx: int = Nx,
irlx: int = 0,
exchange: bool = False,
params: Optional[Params] = None,
use_numba: bool | None = True,
kernel_backend: str = "auto",
rvs_work: Optional[np.ndarray] = None,
) -> np.ndarray:
p = _p(params)
np.copyto(out, _kinetic_cached(n, nx, irlx, p.delk, p.delx, p.rhc, p.rmass))
_fill_potential_banded_inplace(
out,
rvs,
rho,
n,
nx,
exchange,
p,
use_numba,
kernel_backend,
rvs_work,
)
if irlx != 0:
arelax(out, bo, n, nx, p, use_numba, kernel_backend)
return out
def _solve_wigner_lapack(
lapack_ab: np.ndarray,
rhs: np.ndarray,
lower_upper: int,
overwrite: bool,
workspace: Optional[WignerSolveWorkspace] = None,
) -> np.ndarray:
size = lapack_ab.shape[1]
rhs_arr = np.asarray(rhs, dtype=float)
if rhs_arr.shape != (size,):
raise ValueError(f"rhs must have shape ({size},)")
can_overwrite_rhs = (
overwrite
and isinstance(rhs, np.ndarray)
and rhs_arr is rhs
and rhs_arr.flags.c_contiguous
and rhs_arr.dtype == np.float64
)
if can_overwrite_rhs:
rhs_2d = rhs_arr.reshape(size, 1)
elif workspace is not None and workspace.rhs.shape == (size, 1):
rhs_2d = workspace.rhs
rhs_2d[:, 0] = rhs_arr
else:
rhs_2d = np.array(rhs_arr.reshape(size, 1), dtype=float, order="F", copy=True)
_, _, solution, info = spla.lapack.dgbsv(
lower_upper,
lower_upper,
lapack_ab,
rhs_2d,
overwrite_ab=True,
overwrite_b=True,
)
if info < 0:
raise ValueError(f"LAPACK dgbsv argument {-info} had an illegal value")
if info > 0:
raise np.linalg.LinAlgError(f"LAPACK dgbsv found a singular pivot at U({info}, {info})")
solved = solution[:, 0]
if overwrite:
if can_overwrite_rhs and np.shares_memory(solved, rhs):
return rhs
rhs[:] = solved
return rhs
return solved.copy()
def wigstd(f: np.ndarray, bo: np.ndarray, rvs: np.ndarray, rho: np.ndarray,
n: int = N, nx: int = Nx, irlx: int = 0, exchange: bool = False,
params: Optional[Params] = None, overwrite: bool = True,
use_numba: bool | None = True, kernel_backend: str = "auto",
workspace: Optional[WignerSolveWorkspace] = None) -> np.ndarray:
"""
Solve the Wigner steady-state equation.
This path keeps T, V, and S in banded storage and calls LAPACK ``dgbsv``
directly. With a ``WignerSolveWorkspace`` it reuses the LAPACK band and RHS
buffers across transient steps.
"""
lower_upper = 2 * n
size = n * nx
if workspace is not None:
if workspace.n != n or workspace.nx != nx or workspace.lower_upper != lower_upper:
raise ValueError("workspace dimensions do not match n/nx")
lapack_ab = workspace.lapack_ab
band = workspace.band
rvs_work = workspace.rvs_work
else:
lapack_ab = np.zeros((1 + 3 * lower_upper, size), dtype=float, order="F")
band = lapack_ab[lower_upper:, :]
rvs_work = None
_assemble_wigner_banded_inplace(
band,
bo,
rvs,
rho,
n,
nx,
irlx,
exchange,
params,
use_numba,
kernel_backend,
rvs_work,
)
return _solve_wigner_lapack(lapack_ab, f, lower_upper, overwrite, workspace)
[docs]
def tridag(a: np.ndarray, b: np.ndarray, c: np.ndarray, r: np.ndarray, nx: int) -> np.ndarray:
"""Thomas algorithm for tridiagonal systems."""
u = np.zeros(nx, dtype=float)
gam = np.zeros(nx, dtype=float)
bet = b[0]
if bet == 0.0:
raise ZeroDivisionError("tridag failed: zero pivot at row 0")
u[0] = r[0] / bet
for j in range(1, nx):
gam[j] = c[j - 1] / bet
bet = b[j] - a[j] * gam[j]
if bet == 0.0:
raise ZeroDivisionError(f"tridag failed: zero pivot at row {j}")
u[j] = (r[j] - a[j] * u[j - 1]) / bet
for j in range(nx - 2, -1, -1):
u[j] -= gam[j + 1] * u[j + 1]
return u
def poissonb(b: np.ndarray, rvs: np.ndarray, nx: int = Nx, n: int = N, vo: float = 0.0,
vb: float = 0.0, rfa: float = 650.0, amax: Optional[np.ndarray] = None,
params: Optional[Params] = None, save_csv: bool = False) -> np.ndarray:
p = _p(params)
b2 = np.asarray(b).reshape(nx, n)
charge_sum = b2.sum(axis=1)
deno = 12.910 * 8.8542e-12 / (1e10 * 1.6e-19)
ep2 = 1.0 / p.delx
cc = p.delk / (2.0 * math.pi)
rx = np.arange(nx) * p.delx
r1 = (p.box - p.well) / 2.0 - p.barrier - p.spacer
r4 = (p.box + p.well) / 2.0 + p.barrier + p.spacer
inside = (rx >= r1) & (rx <= r4)
rhs = np.empty(nx, dtype=float)
rhs[inside] = (cc * charge_sum[inside] / deno) * p.delx / 2.0
rhs[~inside] = -((p.dens - cc * charge_sum[~inside]) / deno) * p.delx / 2.0
rhs[0] += ep2 * vo / 2.0
rhs[-1] += ep2 * vb / 2.0
a = np.full(nx, -0.5 * ep2)
bg = np.full(nx, 1.0 * ep2)
cvec = np.full(nx, -0.5 * ep2)
pvec = np.empty(nx, dtype=float)
pvec[0] = (0.5 * rvs[1] - rvs[0]) * ep2 + rhs[0]
pvec[-1] = (-rvs[-1] + 0.5 * rvs[-2]) * ep2 + rhs[-1]
pvec[1:-1] = (0.5 * rvs[2:] - rvs[1:-1] + 0.5 * rvs[:-2]) * ep2 + rhs[1:-1]
rvs_new = tridag(a, bg, cvec, pvec, nx)
if save_csv:
np.savetxt("poisson_eqsol.csv", rvs, delimiter=",", fmt="%s")
max_abs = float(np.max(np.abs(rvs_new)))
if amax is not None:
amax[0] = max_abs
du = 1.0 if max_abs == 0.0 else math.log1p(rfa * max_abs) / (rfa * max_abs)
rvs[:] = du * rvs_new + rvs
rvscalc(rvs, nx, 0.0, 1, p)
return rvs
def poisson(b: np.ndarray, xa: np.ndarray, nx: int = Nx, n: int = N, vo: float = 0.0,
vb: float = 0.0, m: int = 1, params: Optional[Params] = None) -> np.ndarray:
p = _p(params)
b2 = np.asarray(b).reshape(nx, n)
charge_sum = b2.sum(axis=1)
deno = 12.910 * 8.8542e-12 / (1.0e10 * 1.6e-19)
cc = p.delk / (2.0 * math.pi)
rx = np.arange(nx) * p.delx
r1 = (p.box - p.well) / 2.0 - p.barrier - p.spacer
r4 = (p.box + p.well) / 2.0 + p.barrier + p.spacer
inside = (rx >= r1) & (rx <= r4)
rhs = np.empty(nx, dtype=float)
rhs[inside] = ((cc * charge_sum[inside] / deno) * (p.delx**2)) / 2.0
rhs[~inside] = -(((p.dens - cc * charge_sum[~inside]) / deno) * (p.delx**2)) / 2.0
rhs[0] += vo / 2.0
rhs[-1] += vb / 2.0
a = np.full(nx, -0.5)
bg = np.ones(nx)
cvec = np.full(nx, -0.5)
xa[:] = tridag(a, bg, cvec, rhs, nx)
rvscalc(xa, nx, 0.0, 1, p)
return xa
def curcalc(b: np.ndarray, rj: np.ndarray, n: int = N, nx: int = Nx, irj: int = 2,
params: Optional[Params] = None, use_numba: bool | None = True,
kernel_backend: str = "auto") -> np.ndarray:
p = _p(params)
B = np.asarray(b).reshape(nx, n)
nh = n // 2
if irj == 2:
cofj = (5915774.594 * 1.6e12 * p.delk**2) / (8.0 * math.pi * p.rmass)
backend = _kernel_backend(use_numba, kernel_backend)
if _native_curcalc_current(np.ascontiguousarray(B), rj, nx, n, cofj, backend):
return rj
jneg = np.arange(1, nh + 1)
wneg = (2 * jneg - n - 1)
jpos = np.arange(nh + 1, n + 1)
wpos = (2 * jpos - n - 1)
vals = np.zeros(nx)
# Original uses 1-based i=2..Nx-2, accessing rows i, i+1 and i-2, i-1 in 0-based B.
for i0 in range(1, nx - 2):
vals[i0] = (
np.dot(wneg, 3.0 * B[i0 + 1, :nh] - B[i0 + 2, :nh])
+ np.dot(wpos, -B[i0 - 1, nh:] + 3.0 * B[i0, nh:])
)
rj[:] = cofj * vals
rj[1] = 2.0 * rj[2] - rj[3]
rj[0] = rj[1]
rj[nx - 2] = 2.0 * rj[nx - 3] - rj[nx - 4]
rj[nx - 1] = rj[nx - 2]
elif irj == 1:
coef = p.delk / (2.0 * math.pi)
backend = _kernel_backend(use_numba, kernel_backend)
if _native_curcalc_density(np.ascontiguousarray(B), rj, nx, n, coef, backend):
return rj
rj[:] = coef * B.sum(axis=1)
else:
raise ValueError("irj must be 1 for density or 2 for current")
return rj
def comp(x: np.ndarray, xo: np.ndarray, n: int = N, nx: int = Nx, eps: float = 7.0e-10) -> tuple[int, float]:
amx = float(np.max(np.abs(np.asarray(x[:nx]) - np.asarray(xo[:nx]))))
return (0 if amx > eps else 1), amx
def subsrvs(rvs: np.ndarray, nx: int = Nx, params: Optional[Params] = None) -> np.ndarray:
p = _p(params)
r1 = (p.box - p.well) / 2.0 - p.barrier
r2 = (p.box - p.well) / 2.0
ra = r1 - p.barrier
ira = int(ra / p.delx) + 1
nxh = nx // 2
idx = np.arange(max(ira - 1, 0), nxh)
rx = idx * p.delx
idx = idx[(rx <= r2) & (rx >= r1)]
rvs[idx] -= p.pot
rvs[nx - 1 - idx] -= p.pot
return rvs
[docs]
def initial_state(params: Optional[Params] = None, exchange: bool = True, irlx_value: int = 0):
"""Compute the default steady-state initialization."""
p = _p(params)
rvs = np.zeros(p.Nx)
rho = np.zeros(p.Nx)
f = np.zeros(p.N * p.Nx)
rj = np.zeros(p.Nx)
rvscalc(rvs, p.Nx, 0.0, 2, p)
fbndry(f, p.N, p.Nx, p.chemp, p)
wigstd(f, f, rvs, rho, p.N, p.Nx, irlx_value, exchange, p)
curcalc(f, rj, p.N, p.Nx, 2, p)
return rvs, rho, f, rj
# -----------------------------------------------------------------------------
# Compatibility layer for the lww_transport package API
# -----------------------------------------------------------------------------
# Supports both array-first calls, e.g. fbndry(b, n, nx), and config-first calls,
# e.g. fbndry(cfg, b).
_orig_fbndry = fbndry
_orig_rvscalc = rvscalc
_orig_kinetic = kinetic
_orig_wigstd = wigstd
_orig_poissonb = poissonb
_orig_poisson = poisson
_orig_curcalc = curcalc
_orig_comp = comp
_orig_subsrvs = subsrvs
_orig_fermi_function = fermi_function
def _is_cfg(obj) -> bool:
return hasattr(obj, "nx") and hasattr(obj, "n")
def _cfg_params(cfg, params: Optional[Params] = None) -> Params:
"""Build a core ``Params`` object from ``LWWConfig``."""
vals = {}
for field in Params.__dataclass_fields__:
if field == "Nx":
vals[field] = int(getattr(cfg, "nx", DEFAULT.Nx))
elif field == "N":
vals[field] = int(getattr(cfg, "n", DEFAULT.N))
else:
vals[field] = getattr(cfg, field, getattr(DEFAULT, field))
return Params(**vals)
def _extra_pos_to_kwargs(extra, names, kwargs):
kwargs = dict(kwargs)
for name, value in zip(names, extra):
kwargs.setdefault(name, value)
return kwargs
[docs]
def fbndry(*args, **kwargs) -> np.ndarray:
if args and _is_cfg(args[0]):
cfg = args[0]
if len(args) < 2:
raise TypeError("fbndry(cfg, b) requires b")
b = args[1]
p = _cfg_params(cfg, kwargs.pop("params", None))
chemp_value = kwargs.pop("chemp_value", getattr(p, "chemp", chemp))
return _orig_fbndry(b, n=int(cfg.n), nx=int(cfg.nx), chemp_value=chemp_value, params=p)
return _orig_fbndry(*args, **kwargs)
[docs]
def fermi_function(*args, **kwargs) -> np.ndarray:
if args and _is_cfg(args[0]):
cfg = args[0]
p = _cfg_params(cfg, kwargs.pop("params", None))
chemp_value = kwargs.pop("chemp_value", getattr(p, "chemp", chemp))
return _orig_fermi_function(n=int(cfg.n), chemp_value=chemp_value, params=p)
return _orig_fermi_function(*args, **kwargs)
[docs]
def rvscalc(*args, **kwargs) -> np.ndarray:
if args and _is_cfg(args[0]):
cfg = args[0]
if len(args) < 2:
raise TypeError("rvscalc(cfg, rvs, ...) requires rvs")
rvs = args[1]
extra = args[2:]
kwargs = _extra_pos_to_kwargs(extra, ["bias", "isc"], kwargs)
p = _cfg_params(cfg, kwargs.pop("params", None))
bias = kwargs.pop("bias", 0.0)
isc = kwargs.pop("isc", 0)
return _orig_rvscalc(rvs, nx=int(cfg.nx), bias=bias, isc=isc, params=p)
return _orig_rvscalc(*args, **kwargs)
[docs]
def kinetic(*args, **kwargs) -> np.ndarray:
if args and _is_cfg(args[0]):
cfg = args[0]
p = _cfg_params(cfg, kwargs.pop("params", None))
if len(args) > 1:
kwargs.setdefault("irlx", args[1])
return _orig_kinetic(n=int(cfg.n), nx=int(cfg.nx), params=p, **kwargs)
return _orig_kinetic(*args, **kwargs)
[docs]
def wigstd(*args, **kwargs) -> np.ndarray:
if args and _is_cfg(args[0]):
cfg = args[0]
if len(args) < 5:
raise TypeError("wigstd(cfg, f, bo, rvs, rho, ...) requires five positional arguments")
f, bo, rvs, rho = args[1], args[2], args[3], args[4]
p = _cfg_params(cfg, kwargs.pop("params", None))
return _orig_wigstd(
f, bo, rvs, rho,
n=int(cfg.n),
nx=int(cfg.nx),
irlx=kwargs.pop("irlx", 0),
exchange=kwargs.pop("exchange", False),
params=p,
overwrite=kwargs.pop("overwrite", True),
use_numba=kwargs.pop("use_numba", getattr(cfg, "use_numba", True)),
kernel_backend=kwargs.pop("kernel_backend", getattr(cfg, "kernel_backend", "auto")),
workspace=kwargs.pop("workspace", None),
)
return _orig_wigstd(*args, **kwargs)
[docs]
def poissonb(*args, **kwargs) -> np.ndarray:
if args and _is_cfg(args[0]):
cfg = args[0]
if len(args) < 3:
raise TypeError("poissonb(cfg, b, rvs, ...) requires b and rvs")
b, rvs = args[1], args[2]
kwargs = _extra_pos_to_kwargs(args[3:], ["vo", "vb", "rfa", "amax"], kwargs)
p = _cfg_params(cfg, kwargs.pop("params", None))
return _orig_poissonb(
b, rvs,
nx=int(cfg.nx),
n=int(cfg.n),
vo=kwargs.pop("vo", 0.0),
vb=kwargs.pop("vb", 0.0),
rfa=kwargs.pop("rfa", 650.0),
amax=kwargs.pop("amax", None),
params=p,
save_csv=kwargs.pop("save_csv", False),
)
return _orig_poissonb(*args, **kwargs)
[docs]
def poisson(*args, **kwargs) -> np.ndarray:
if args and _is_cfg(args[0]):
cfg = args[0]
if len(args) < 3:
raise TypeError("poisson(cfg, b, xa, ...) requires b and xa")
b, xa = args[1], args[2]
kwargs = _extra_pos_to_kwargs(args[3:], ["vo", "vb", "m"], kwargs)
p = _cfg_params(cfg, kwargs.pop("params", None))
return _orig_poisson(
b, xa,
nx=int(cfg.nx),
n=int(cfg.n),
vo=kwargs.pop("vo", 0.0),
vb=kwargs.pop("vb", 0.0),
m=kwargs.pop("m", 1),
params=p,
)
return _orig_poisson(*args, **kwargs)
[docs]
def curcalc(*args, **kwargs) -> np.ndarray:
if args and _is_cfg(args[0]):
cfg = args[0]
if len(args) < 3:
raise TypeError("curcalc(cfg, b, mode) or curcalc(cfg, b, rj, ...) requires at least b and mode/rj")
b = args[1]
third = args[2]
if np.isscalar(third):
rj = np.zeros(int(cfg.nx), dtype=float)
kwargs = _extra_pos_to_kwargs(args[3:], [], kwargs)
irj = kwargs.pop("irj", int(third))
else:
rj = third
kwargs = _extra_pos_to_kwargs(args[3:], ["irj"], kwargs)
irj = kwargs.pop("irj", 2)
p = _cfg_params(cfg, kwargs.pop("params", None))
return _orig_curcalc(
b, rj,
n=int(cfg.n),
nx=int(cfg.nx),
irj=irj,
params=p,
use_numba=kwargs.pop("use_numba", getattr(cfg, "use_numba", True)),
kernel_backend=kwargs.pop("kernel_backend", getattr(cfg, "kernel_backend", "auto")),
)
return _orig_curcalc(*args, **kwargs)
[docs]
def comp(*args, **kwargs) -> tuple[int, float]:
if args and _is_cfg(args[0]):
cfg = args[0]
if len(args) < 3:
raise TypeError("comp(cfg, x, xo, ...) requires x and xo")
x, xo = args[1], args[2]
kwargs = _extra_pos_to_kwargs(args[3:], ["eps"], kwargs)
return _orig_comp(x, xo, nx=int(cfg.nx), eps=kwargs.pop("eps", 7.0e-10))
return _orig_comp(*args, **kwargs)
[docs]
def subsrvs(*args, **kwargs) -> np.ndarray:
if args and _is_cfg(args[0]):
cfg = args[0]
if len(args) < 2:
raise TypeError("subsrvs(cfg, rvs) requires rvs")
rvs = args[1]
p = _cfg_params(cfg, kwargs.pop("params", None))
return _orig_subsrvs(rvs, nx=int(cfg.nx), params=p)
return _orig_subsrvs(*args, **kwargs)