Source code for psiop

# Copyright 2026 Philippe Billet assisted by LLMs in free mode: chatGPT, Qwen, Deepseek, Gemini, Claude, le chat Mistral.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
psiop.py — Symbolic‑numerical toolkit for pseudo‑differential operators in 1D/2D
=======================================================================================

Overview
--------
The `psiop` module provides a unified framework for manipulating pseudo‑differential operators (ΨDOs) in one and two spatial dimensions.  It combines **symbolic** construction of operator symbols (using SymPy) with **numerical** evaluation and visualisation (using NumPy/SciPy/Matplotlib).  The package is designed for researchers and students working in microlocal analysis, spectral theory, and the numerical analysis of PDEs.

Key features include:

* Symbol creation either from an explicit expression (symbol mode) or by automatic extraction from a differential operator (auto mode).
* Computation of asymptotic expansions of symbols at high frequencies.
* Determination of the operator order (homogeneity degree) via symbolic/numerical heuristics.
* Asymptotic composition of operators in Kohn‑Nirenberg or Weyl quantisation.
* Construction of formal left/right inverses and the formal adjoint.
* Symbol of the exponential `exp(tP)` via series expansion.
* Semiclassical trace formula (symbolic or numerical).
* Hamiltonian flow associated with the principal symbol (symplectic phase‑space dynamics).
* Pseudospectrum computation and visualisation with adaptive grid refinement and parallelisation.
* Rich visualisation suite: symbol amplitude/phase, cotangent‑fiber structure, characteristic set, group velocity field, micro‑support, Hamiltonian trajectories.
* Interactive dashboard (ipywidgets) for real‑time exploration of the symbol.

Mathematical background
-----------------------
A pseudo‑differential operator `P` acting on functions of `x ∈ ℝⁿ` is formally defined by its **symbol** `p(x,ξ)`, a function on phase space `T*ℝⁿ`.  The action on a function `u` is given by the Kohn‑Nirenberg quantisation

    (P u)(x) = (2π)^{-n} ∫_{ℝⁿ} e^{i x·ξ} p(x,ξ) û(ξ) dξ ,

where `û` is the Fourier transform of `u`.  If the symbol does not depend on `x`, the operator reduces to a Fourier multiplier.  For a general spatially varying symbol, the above representation provides a rigorous extension of differential operators.

The **asymptotic behaviour** of the symbol as `|ξ| → ∞` determines many properties of the operator.  The **principal symbol** `pₘ` is the homogeneous component of highest order `m`.  When the symbol is non‑homogeneous, the module attempts to estimate the effective order by expanding in inverse powers of `|ξ|` (or in a radial variable for 2D).

**Symbolic calculus** allows one to compose operators asymptotically.  For two symbols `p` and `q`, the symbol of the composition `P ∘ Q` is given by an asymptotic series

    (p ∘ q)(x,ξ) ~ Σ_{α} (i)^{-|α|}/α! ∂_ξ^α p(x,ξ) ∂_x^α q(x,ξ)

in the Kohn‑Nirenberg convention (a similar expansion exists for the Weyl star product).  Truncating this series yields approximate compositions valid for high frequencies or slowly varying symbols.

**Formal inverses** and **adjoints** are constructed via similar asymptotic series, assuming the principal symbol never vanishes.

The **Hamiltonian flow** generated by the principal symbol describes the propagation of singularities along bicharacteristics – a cornerstone of microlocal analysis.

The **pseudospectrum** `σ_ε(P)` is the set of `λ ∈ ℂ` for which `‖(P-λI)^{-1}‖ ≥ ε^{-1}`.  It captures the near‑spectral behaviour of non‑normal operators and is visualised via contour plots of the resolvent norm.

References
----------
.. [1] Hörmander, L.  *The Analysis of Linear Partial Differential Operators III*, Springer, 1985.  Chapter 18: Pseudo‑differential Operators.
.. [2] Taylor, M. E.  *Pseudo Differential Operators*, Princeton University Press, 1981.
.. [3] Zworski, M.  *Semiclassical Analysis*, American Mathematical Society, 2012.  Chapter 4: Pseudo‑differential Operators.
.. [4] Martinez, A.  *An Introduction to Semiclassical and Microlocal Analysis*, Springer, 2002.
.. [5] Trefethen, L. N. & Embree, M.  *Spectra and Pseudospectra*, Princeton University Press, 2005.  (For pseudospectrum methods.)
"""

from imports import *
from functools import lru_cache
from concurrent.futures import ThreadPoolExecutor, as_completed
import warnings

[docs] class PseudoDifferentialOperator: """ Pseudo-differential operator with dynamic symbol evaluation on spatial grids. Supports both 1D and 2D operators, and can be defined explicitly (symbol mode) or extracted automatically from symbolic equations (auto mode). Parameters ---------- expr : sympy expression Symbolic expression representing the pseudo-differential symbol. vars_x : list of sympy symbols Spatial variables (e.g., [x] for 1D, [x, y] for 2D). var_u : sympy function, optional Function u(x, t) used in auto mode to extract the operator symbol. mode : str, {'symbol', 'auto'} - 'symbol': directly uses expr as the operator symbol. - 'auto': computes the symbol automatically by applying expr to exp(i x ξ). Attributes ---------- dim : int Spatial dimension (1 or 2). fft, ifft : callable Fast Fourier transform and inverse (scipy.fft or scipy.fft2). p_func : callable Evaluated symbol function ready for numerical use. Notes ----- - In 'symbol' mode, `expr` should be expressed in terms of spatial variables and frequency variables (ξ, η). - In 'auto' mode, the symbol is derived by applying the differential expression to a complex exponential. - Frequency variables are internally named 'xi' and 'eta' for consistency. - Uses numpy for numerical evaluation and scipy.fft for FFT operations. Examples -------- >>> # Example 1: 1D Laplacian operator (symbol mode) >>> from sympy import symbols >>> x, xi = symbols('x xi', real=True) >>> op = PseudoDifferentialOperator(expr=xi**2, vars_x=[x], mode='symbol') >>> # Example 2: 1D transport operator (auto mode) >>> from sympy import Function >>> u = Function('u') >>> expr = u(x).diff(x) >>> op = PseudoDifferentialOperator(expr=expr, vars_x=[x], var_u=u(x), mode='auto') """ def __init__(self, expr, vars_x, var_u=None, mode='symbol', quantization='weyl'): self.dim = len(vars_x) self.mode = mode self.symbol_cached = None self.expr = expr self.vars_x = vars_x self.quantization = quantization if self.dim == 1: x, = vars_x xi_internal = symbols('xi', real=True) expr = expr.subs(symbols('xi', real=True), xi_internal) self.fft = partial(fft, workers=FFT_WORKERS) self.ifft = partial(ifft, workers=FFT_WORKERS) if mode == 'symbol': self.p_func = lambdify((x, xi_internal), expr, 'numpy') self.symbol = expr elif mode == 'auto': if var_u is None: raise ValueError("var_u must be provided in mode='auto'") exp_i = exp(I * x * xi_internal) P_ei = expr.subs(var_u, exp_i) symbol = simplify(P_ei / exp_i) symbol = expand(symbol) self.symbol = symbol self.p_func = lambdify((x, xi_internal), symbol, 'numpy') else: raise ValueError("mode must be 'auto' or 'symbol'") elif self.dim == 2: x, y = vars_x xi_internal, eta_internal = symbols('xi eta', real=True) expr = expr.subs(symbols('xi', real=True), xi_internal) expr = expr.subs(symbols('eta', real=True), eta_internal) self.fft = partial(fft2, workers=FFT_WORKERS) self.ifft = partial(ifft2, workers=FFT_WORKERS) if mode == 'symbol': self.symbol = expr self.p_func = lambdify((x, y, xi_internal, eta_internal), expr, 'numpy') elif mode == 'auto': if var_u is None: raise ValueError("var_u must be provided in mode='auto'") exp_i = exp(I * (x * xi_internal + y * eta_internal)) P_ei = expr.subs(var_u, exp_i) symbol = simplify(P_ei / exp_i) symbol = expand(symbol) self.symbol = symbol self.p_func = lambdify((x, y, xi_internal, eta_internal), symbol, 'numpy') else: raise ValueError("mode must be 'auto' or 'symbol'") else: raise NotImplementedError("Only 1D and 2D supported") if mode == 'auto': self._compute_symbol_derivatives() print("\nsymbol = ") pprint(self.symbol, num_columns=NUM_COLS) def _compute_symbol_derivatives(self): """Compute derivatives for WKB application.""" self.derivatives = {} if self.dim == 1: x = self.vars_x[0] xi = symbols('xi', real=True) self.derivatives['dp_dx'] = diff(self.symbol, x) self.derivatives['dp_dxi'] = diff(self.symbol, xi) self.derivatives['d2p_dxi2'] = diff(self.symbol, xi, 2) self.derivatives['d2p_dx2'] = diff(self.symbol, x, 2) self.derivatives['d2p_dxidx'] = diff(diff(self.symbol, xi), x) elif self.dim == 2: x, y = self.vars_x xi, eta = symbols('xi eta', real=True) self.derivatives['dp_dx'] = diff(self.symbol, x) self.derivatives['dp_dy'] = diff(self.symbol, y) self.derivatives['dp_dxi'] = diff(self.symbol, xi) self.derivatives['dp_deta'] = diff(self.symbol, eta) self.derivatives['d2p_dxi2'] = diff(self.symbol, xi, 2) self.derivatives['d2p_deta2'] = diff(self.symbol, eta, 2) self.derivatives['d2p_dx2'] = diff(self.symbol, x, 2) self.derivatives['d2p_dy2'] = diff(self.symbol, y, 2) self.derivatives['d2p_dxidx'] = diff(diff(self.symbol, xi), x) self.derivatives['d2p_detady'] = diff(diff(self.symbol, eta), y) # Lambdify for numerical evaluation if self.dim == 1: vars_tuple = (self.vars_x[0], symbols('xi', real=True)) else: vars_tuple = tuple(self.vars_x) + (symbols('xi', real=True), symbols('eta', real=True)) for name, expr in self.derivatives.items(): setattr(self, f'_{name}_func', lambdify(vars_tuple, expr, 'numpy'))
[docs] def evaluate(self, X, Y, KX, KY, cache=True): """ Evaluate the pseudo-differential operator's symbol on a grid of spatial and frequency coordinates. The method dynamically selects between 1D and 2D evaluation based on the spatial dimension. If caching is enabled and a cached symbol exists, it returns the cached result to avoid recomputation. Parameters ---------- X, Y : ndarray Spatial grid coordinates. In 1D, Y is ignored. KX, KY : ndarray Frequency grid coordinates. In 1D, KY is ignored. cache : bool, default=True If True, stores the computed symbol for reuse in subsequent calls to avoid redundant computation. Returns ------- ndarray Evaluated symbol values over the input grid. Shape matches the input spatial/frequency grids. Raises ------ NotImplementedError If the spatial dimension is not 1D or 2D. """ if cache and self.symbol_cached is not None: return self.symbol_cached if self.dim == 1: symbol = self.p_func(X, KX) elif self.dim == 2: symbol = self.p_func(X, Y, KX, KY) if cache: self.symbol_cached = symbol return symbol
[docs] def clear_cache(self): """ Clear cached symbol evaluations. """ self.symbol_cached = None
[docs] def apply(self, u, x_grid, kx, boundary_condition='periodic', y_grid=None, ky=None, dealiasing_mask=None, freq_window='gaussian', clamp=1e6, space_window=False): """ Apply the pseudo-differential operator to the input field u. This method dispatches the application of the pseudo-differential operator based on: - Whether the symbol is spatially dependent (x/y) - The boundary condition in use (periodic or dirichlet) Supported operations: - Constant-coefficient symbols: applied via Fourier multiplication. - Spatially varying symbols: applied via Kohn–Nirenberg quantization. - Dirichlet boundary conditions: handled with non-periodic convolution-like quantization. Dispatch Logic:\n if not self.is_spatial: u ↦ Op(p)(D) ⋅ u = 𝓕⁻¹[ p(ξ) ⋅ 𝓕(u) ]\n elif periodic: u ↦ Op(p)(x,D) ⋅ u ≈ ∫ eᶦˣᶿ p(x, ξ) 𝓕(u)(ξ) dξ based of FFT (quicker)\n elif dirichlet: u ↦ Op(p)(x,D) ⋅ u ≈ u ≈ ∫ eᶦˣᶿ p(x, ξ) 𝓕(u)(ξ) dξ (slower)\n Parameters ---------- u : ndarray Function to which the operator is applied x_grid : ndarray Spatial grid in x direction kx : ndarray Frequency grid in x direction boundary_condition : str 'periodic' or 'dirichlet' y_grid : ndarray, optional Spatial grid in y direction (for 2D) ky : ndarray, optional Frequency grid in y direction (for 2D) dealiasing_mask : ndarray, optional Dealiasing mask freq_window : str Frequency windowing ('gaussian' or 'hann') clamp : float Clamp symbol values to [-clamp, clamp] space_window : bool Apply spatial windowing Returns ------- ndarray Result of applying the operator """ # Check if symbol depends on spatial variables is_spatial = self._is_spatial_dependent() # Case 1: Constant symbol with periodic BC (fast path) if not is_spatial and boundary_condition == 'periodic': return self._apply_constant_fft(u, x_grid, kx, y_grid, ky, dealiasing_mask) # Case 2: Spatial symbol with periodic BC elif boundary_condition == 'periodic': symbol_func = self._get_symbol_func() return kohn_nirenberg_fft( u_vals=u, symbol_func=symbol_func, x_grid=x_grid, kx=kx, fft_func=self.fft, ifft_func=self.ifft, dim=self.dim, y_grid=y_grid, ky=ky, freq_window=freq_window, clamp=clamp, space_window=space_window ) # Case 3: Dirichlet BC (non-periodic) elif boundary_condition == 'dirichlet': symbol_func = self._get_symbol_func() if self.dim == 1: return kohn_nirenberg_nonperiodic( u_vals=u, x_grid=x_grid, xi_grid=kx, symbol_func=symbol_func, freq_window=freq_window, clamp=clamp, space_window=space_window ) elif self.dim == 2: return kohn_nirenberg_nonperiodic( u_vals=u, x_grid=(x_grid, y_grid), xi_grid=(kx, ky), symbol_func=symbol_func, freq_window=freq_window, clamp=clamp, space_window=space_window ) else: raise ValueError(f"Invalid boundary condition '{boundary_condition}'")
def _is_spatial_dependent(self): """ Check if the symbol depends on spatial variables. Returns ------- bool True if symbol depends on x (or x, y) """ if self.dim == 1: return self.symbol.has(self.vars_x[0]) elif self.dim == 2: x, y = self.vars_x return self.symbol.has(x) or self.symbol.has(y) else: return False def _get_symbol_func(self): """ Get a lambdified version of the symbol. Returns ------- callable Lambdified symbol function """ if self.dim == 1: x = self.vars_x[0] xi = symbols('xi', real=True) return lambdify((x, xi), self.symbol, 'numpy') elif self.dim == 2: x, y = self.vars_x xi, eta = symbols('xi eta', real=True) return lambdify((x, y, xi, eta), self.symbol, 'numpy') else: raise NotImplementedError("Only 1D and 2D supported") def _apply_constant_fft(self, u, x_grid, kx, y_grid, ky, dealiasing_mask): """ Apply a constant-coefficient pseudo-differential operator in Fourier space. This method assumes the symbol is diagonal in the Fourier basis and acts as a multiplication operator. It performs the operation: (ψu)(x) = 𝓕⁻¹[ -σ(k) · 𝓕[u](k) ] where: - σ(k) is the combined pseudo-differential operator symbol - 𝓕 denotes the forward Fourier transform - 𝓕⁻¹ denotes the inverse Fourier transform The dealiasing mask is applied before returning to physical space. Parameters ---------- u : ndarray Input function x_grid : ndarray Spatial grid (x) kx : ndarray Frequency grid (x) y_grid : ndarray, optional Spatial grid (y, for 2D) ky : ndarray, optional Frequency grid (y, for 2D) dealiasing_mask : ndarray, optional Dealiasing mask Returns ------- ndarray Result """ u_hat = self.fft(u) # Evaluate symbol at grid points if self.dim == 1: X_dummy = np.zeros_like(kx) symbol_vals = self.p_func(X_dummy, kx) elif self.dim == 2: KX, KY = np.meshgrid(kx, ky, indexing='ij') X_dummy = np.zeros_like(KX) Y_dummy = np.zeros_like(KY) symbol_vals = self.p_func(X_dummy, Y_dummy, KX, KY) else: raise ValueError("Only 1D and 2D supported") # Apply symbol u_hat *= symbol_vals # Apply dealiasing if dealiasing_mask is not None: u_hat *= dealiasing_mask return self.ifft(u_hat)
[docs] def principal_symbol(self, order=1): """ Compute the leading homogeneous component of the pseudo-differential symbol. This method extracts the principal part of the symbol, which is the dominant term under high-frequency asymptotics (|ξ| → ∞). The expansion is performed in polar coordinates for 2D symbols to maintain rotational symmetry, then converted back to Cartesian form. Parameters ---------- order : int Order of the asymptotic expansion in powers of 1/ρ, where ρ = |ξ| in 1D or ρ = sqrt(ξ² + η²) in 2D. Only the leading-order term is returned. Returns ------- sympy.Expr The principal symbol component, homogeneous of degree `m - order`, where `m` is the original symbol's order. Notes: - In 1D, uses direct series expansion in ξ. - In 2D, expands in radial variable ρ while preserving angular dependence. - Useful for microlocal analysis and constructing parametrices. """ p = self.symbol if self.dim == 1: xi = symbols('xi', real=True, positive=True) return simplify(series(p, xi, oo, n=order).removeO()) elif self.dim == 2: xi, eta = symbols('xi eta', real=True, positive=True) # Homogeneous radial expansion: we set (ξ, η) = ρ (cosθ, sinθ) rho, theta = symbols('rho theta', real=True, positive=True) p_rho = p.subs({xi: rho * cos(theta), eta: rho * sin(theta)}) expansion = series(p_rho, rho, oo, n=order).removeO() # Revert back to (ξ, η) expansion_cart = expansion.subs({rho: sqrt(xi**2 + eta**2), cos(theta): xi / sqrt(xi**2 + eta**2), sin(theta): eta / sqrt(xi**2 + eta**2)}) return simplify(powdenest(expansion_cart, force=True))
[docs] def is_homogeneous(self, tol=1e-10): """ Check whether the symbol is homogeneous in the frequency variables. Returns ------- (bool, Rational or float or None) Tuple (is_homogeneous, degree) where: - is_homogeneous: True if the symbol satisfies p(λξ, λη) = λ^m * p(ξ, η) - degree: the detected degree m if homogeneous, or None """ from sympy import symbols, simplify, expand, Eq from sympy.abc import l if self.dim == 1: xi = symbols('xi', real=True, positive=True) l = symbols('l', real=True, positive=True) p = self.symbol p_scaled = p.subs(xi, l * xi) ratio = simplify(p_scaled / p) if ratio.has(xi): return False, None try: deg = simplify(ratio).as_base_exp()[1] return True, deg except Exception: return False, None elif self.dim == 2: xi, eta = symbols('xi eta', real=True, positive=True) l = symbols('l', real=True, positive=True) p = self.symbol p_scaled = p.subs({xi: l * xi, eta: l * eta}) ratio = simplify(p_scaled / p) # If ratio == l**m with no (xi, eta) left, it's homogeneous if ratio.has(xi, eta): return False, None try: base, exp = ratio.as_base_exp() if base == l: return True, exp except Exception: pass return False, None
[docs] def symbol_order(self, max_order=10, tol=1e-3): """ Estimate the homogeneity order of the pseudo-differential symbol in high-frequency asymptotics. This method attempts to determine the leading-order behavior of the symbol p(x, ξ) or p(x, y, ξ, η) as |ξ| → ∞ (in 1D) or |(ξ, η)| → ∞ (in 2D). The returned value represents the asymptotic growth or decay rate, which is essential for understanding the regularity and mapping properties of the corresponding operator. The function uses symbolic preprocessing to ensure proper factorization of frequency variables, especially in sqrt and power expressions, to avoid erroneous order detection (e.g., due to hidden scaling). Parameters ---------- max_order : int, optional Maximum number of terms to consider in the series expansion. Default is 10. tol : float, optional Tolerance threshold for evaluating the coefficient magnitude. If the coefficient is too small, the detected order may be discarded. Default is 1e-3. Returns ------- float or None - If the symbol is homogeneous, returns its exact homogeneity degree as a float. - Otherwise, estimates the dominant asymptotic order from leading terms in the expansion. - Returns None if no valid order could be determined. Notes ----- - In 1D: Two strategies are used: 1. Expand directly in xi at infinity. 2. Substitute xi = 1/z and expand around z = 0. - In 2D: - Transform the symbol into polar coordinates: (xi, eta) = rho*(cos(theta), sin(theta)). - Expand in rho at infinity, then extract the leading term's power. - An alternative substitution using 1/z is also tried if the first method fails. - Preprocessing steps: - Sqrt expressions involving frequencies are rewritten to isolate the leading variable. - Power expressions are factored explicitly to ensure correct symbolic scaling. - If the symbol is not homogeneous, a warning is issued, and the result should be interpreted with care. - For non-homogeneous symbols, only the principal asymptotic term is considered. Raises ------ NotImplementedError If the spatial dimension is neither 1 nor 2. """ from sympy import ( symbols, series, simplify, sqrt, cos, sin, oo, powdenest, radsimp, expand, expand_power_base ) def preprocess_sqrt(expr, freq): return expr.replace( lambda e: e.func == sqrt and freq in e.free_symbols, lambda e: freq * sqrt(1 + (e.args[0] - freq**2) / freq**2) ) def preprocess_power(expr, freq): return expr.replace( lambda e: e.is_Pow and freq in e.free_symbols, lambda e: freq**e.exp * (1 + e.base / freq**e.base.as_powers_dict().get(freq, 0))**e.exp ) def validate_order(power, coeff, vars_x, tol): if power is None: return None if any(v in coeff.free_symbols for v in vars_x): print("⚠️ Coefficient depends on spatial variables; ignoring") return None try: coeff_val = abs(float(coeff.evalf())) if coeff_val < tol: print(f"⚠️ Coefficient too small ({coeff_val:.2e} < {tol})") return None except Exception as e: print(f"⚠️ Coefficient evaluation failed: {e}") return None return int(power) if power == int(power) else float(power) # Homogeneity check is_homog, degree = self.is_homogeneous() if is_homog: return float(degree) else: print("⚠️ The symbol is not homogeneous. The asymptotic order is not well defined.") if self.dim == 1: x = self.vars_x[0] xi = symbols('xi', real=True, positive=True) try: print("1D symbol_order - method 1") expr = preprocess_sqrt(self.symbol, xi) s = series(expr, xi, oo, n=max_order).removeO() lead = simplify(powdenest(s.as_leading_term(xi), force=True)) power = lead.as_powers_dict().get(xi, None) coeff = lead / xi**power if power is not None else 0 print("lead =", lead) print("power =", power) print("coeff =", coeff) order = validate_order(power, coeff, [x], tol) if order is not None: return order except Exception: pass try: print("1D symbol_order - method 2") z = symbols('z', real=True, positive=True) expr_z = preprocess_sqrt(self.symbol.subs(xi, 1/z), 1/z) s = series(expr_z, z, 0, n=max_order).removeO() lead = simplify(powdenest(s.as_leading_term(z), force=True)) power = lead.as_powers_dict().get(z, None) coeff = lead / z**power if power is not None else 0 print("lead =", lead) print("power =", power) print("coeff =", coeff) order = validate_order(power, coeff, [x], tol) if order is not None: return -order except Exception as e: print(f"⚠️ fallback z failed: {e}") return None elif self.dim == 2: x, y = self.vars_x xi, eta = symbols('xi eta', real=True, positive=True) rho, theta = symbols('rho theta', real=True, positive=True) try: print("2D symbol_order - method 1") p_rho = self.symbol.subs({xi: rho * cos(theta), eta: rho * sin(theta)}) p_rho = preprocess_power(preprocess_sqrt(p_rho, rho), rho) s = series(simplify(p_rho), rho, oo, n=max_order).removeO() lead = radsimp(simplify(powdenest(s.as_leading_term(rho), force=True))) power = lead.as_powers_dict().get(rho, None) coeff = lead / rho**power if power is not None else 0 print("lead =", lead) print("power =", power) print("coeff =", coeff) order = validate_order(power, coeff, [x, y], tol) if order is not None: return order except Exception as e: print(f"⚠️ polar expansion failed: {e}") try: print("2D symbol_order - method 2") z = symbols('z', real=True, positive=True) xi_eta = {xi: (1/z) * cos(theta), eta: (1/z) * sin(theta)} p_rho = preprocess_sqrt(self.symbol.subs(xi_eta), 1/z) s = series(simplify(p_rho), z, 0, n=max_order).removeO() lead = radsimp(simplify(powdenest(s.as_leading_term(z), force=True))) power = lead.as_powers_dict().get(z, None) coeff = lead / z**power if power is not None else 0 print("lead =", lead) print("power =", power) print("coeff =", coeff) order = validate_order(power, coeff, [x, y], tol) if order is not None: return -order except Exception as e: print(f"⚠️ fallback z (2D) failed: {e}") return None else: raise NotImplementedError("Only 1D and 2D supported.")
[docs] def asymptotic_expansion(self, order=3): """ Compute the asymptotic expansion of the symbol as |ξ| → ∞ (high-frequency regime). This method expands the pseudo-differential symbol in inverse powers of the frequency variable(s), either in 1D or 2D. It handles both polynomial and exponential symbols by performing a series expansion in 1/|ξ| up to the specified order. The expansion is performed directly in Cartesian coordinates for 1D symbols. For 2D symbols, the method uses polar coordinates (ρ, θ) to perform the expansion at infinity in ρ, then converts the result back to Cartesian coordinates. Parameters ---------- order : int, optional Maximum order of the asymptotic expansion. Default is 3. Returns ------- sympy.Expr The asymptotic expansion of the symbol up to the given order, expressed in Cartesian coordinates. If expansion fails, returns the original unexpanded symbol. Notes: - In 1D: expansion is performed directly in terms of ξ. - In 2D: the symbol is first rewritten in polar coordinates (ρ,θ), expanded asymptotically in ρ → ∞, then converted back to Cartesian coordinates (ξ,η). - Handles special case when the symbol is an exponential function by expanding its argument. - Symbolic normalization is applied early (via `simplify`) for 2D expressions to improve convergence. - Robust to failures: catches exceptions and issues warnings instead of raising errors. - Final expression is simplified using `powdenest` and `expand` for improved readability. """ p = self.symbol if self.dim == 1: xi = symbols('xi', real=True, positive=True) try: # Case: exponential function if p.func == exp and len(p.args) == 1: arg = p.args[0] arg_series = series(arg, xi, oo, n=order).removeO() expanded = series(exp(expand(arg_series)), xi, oo, n=order).removeO() return simplify(powdenest(expanded, force=True)) else: expanded = series(p, xi, oo, n=order).removeO() return simplify(powdenest(expanded, force=True)) except Exception as e: print(f"Warning: 1D expansion failed: {e}") return p elif self.dim == 2: xi, eta = symbols('xi eta', real=True, positive=True) rho, theta = symbols('rho theta', real=True, positive=True) # Normalize before substitution p = simplify(p) # Substitute polar coordinates p_polar = p.subs({ xi: rho * cos(theta), eta: rho * sin(theta) }) try: # Handle exponentials if p_polar.func == exp and len(p_polar.args) == 1: arg = p_polar.args[0] arg_series = series(arg, rho, oo, n=order).removeO() expanded = series(exp(expand(arg_series)), rho, oo, n=order).removeO() else: expanded = series(p_polar, rho, oo, n=order).removeO() # Convert back to Cartesian norm = sqrt(xi**2 + eta**2) expansion_cart = expanded.subs({ rho: norm, cos(theta): xi / norm, sin(theta): eta / norm }) # Final simplifications result = simplify(powdenest(expansion_cart, force=True)) result = expand(result) return result except Exception as e: print(f"Warning: 2D expansion failed: {e}") return p
[docs] def compose_asymptotic(self, other, order=1, mode='kn', sign_convention=None): """ Compose two pseudo-differential operators using an asymptotic expansion in the chosen quantization scheme (Kohn–Nirenberg or Weyl). Parameters ---------- other : PseudoDifferentialOperator The operator to compose with this one. order : int, default=1 Maximum order of the asymptotic expansion. mode : {'kn', 'weyl'}, default='kn' Quantization mode: - 'kn' : Kohn–Nirenberg quantization (left-quantized) - 'weyl' : Weyl symmetric quantization sign_convention : {'standard', 'inverse'}, optional Controls the phase factor convention for the KN case: - 'standard' → (i)^(-n), gives [x, ξ] = +i (physics convention) - 'inverse' → (i)^(+n), gives [x, ξ] = -i (mathematical adjoint convention) If None, defaults to 'standard'. Returns ------- sympy.Expr Symbolic expression for the composed symbol up to the given order. Notes ----- - In 1D (Kohn–Nirenberg): (p ∘ q)(x, ξ) ~ Σₙ (1/n!) (i sgn)^n ∂_ξⁿ p(x, ξ) ∂_xⁿ q(x, ξ) - In 1D (Weyl): (p # q)(x, ξ) = exp[(i/2)(∂_ξ^p ∂_x^q - ∂_x^p ∂_ξ^q)] p(x, ξ) q(x, ξ) truncated at given order. Examples -------- X = a*x, Y = b*ξ X_op.compose_asymptotic(Y_op, order=3, mode='weyl') """ from sympy import diff, factorial, simplify, symbols assert self.dim == other.dim, "Operator dimensions must match" p, q = self.symbol, other.symbol # Default sign convention if sign_convention is None: sign_convention = 'standard' sign = -1 if sign_convention == 'standard' else +1 # --- 1D case --- if self.dim == 1: x = self.vars_x[0] xi = symbols('xi', real=True) result = 0 if mode == 'kn': # Kohn–Nirenberg for n in range(order + 1): term = (1 / factorial(n)) * diff(p, xi, n) * diff(q, x, n) * (1j) ** (sign * n) result += term elif mode == 'weyl': # Weyl symmetric composition # Weyl star product: exp((i/2)(∂_ξ^p ∂_x^q - ∂_x^p ∂_ξ^q)) result = 0 for n in range(order + 1): for k in range(n + 1): # k derivatives acting as (∂_ξ^k p)(∂_x^(n−k) q) coeff = (1 / (factorial(k) * factorial(n - k))) * ((1j / 2) ** n) * ((-1) ** (n - k)) term = coeff * diff(p, xi, k, x, n - k, evaluate=True) * diff(q, x, k, xi, n - k, evaluate=True) result += term else: raise ValueError("mode must be either 'kn' or 'weyl'") return simplify(result) # --- 2D case --- elif self.dim == 2: x, y = self.vars_x xi, eta = symbols('xi eta', real=True) result = 0 if mode == 'kn': for n in range(order + 1): for i in range(n + 1): j = n - i term = (1 / (factorial(i) * factorial(j))) * \ diff(p, xi, i, eta, j) * diff(q, x, i, y, j) * (1j) ** (sign * n) result += term elif mode == 'weyl': for n in range(order + 1): for i in range(n + 1): j = n - i coeff = (1 / (factorial(i) * factorial(j))) * ((1j / 2) ** n) * ((-1) ** (n - i)) term = coeff * diff(p, xi, i, eta, j, x, 0, y, 0) * diff(q, x, i, y, j, xi, 0, eta, 0) result += term else: raise ValueError("mode must be either 'kn' or 'weyl'") return simplify(result) else: raise NotImplementedError("Only 1D and 2D cases are implemented")
[docs] def commutator_symbolic(self, other, order=1, mode='kn', sign_convention=None): """ Compute the symbolic commutator [A, B] = A∘B − B∘A of two pseudo-differential operators using formal asymptotic expansion of their composition symbols. This method computes the asymptotic expansion of the commutator's symbol up to a given order, based on the symbolic calculus of pseudo-differential operators in the Kohn–Nirenberg quantization. The result is a purely symbolic sympy expression that captures the leading-order noncommutativity of the operators. Parameters ---------- other : PseudoDifferentialOperator The pseudo-differential operator B to commute with this operator A. order : int, default=1 Maximum order of the asymptotic expansion. - order=1 yields the leading term proportional to the Poisson bracket {p, q}. - Higher orders include correction terms involving higher mixed derivatives. Returns ------- sympy.Expr Symbolic expression for the asymptotic expansion of the commutator symbol σ([A,B]) = σ(A∘B − B∘A). """ assert self.dim == other.dim, "Operator dimensions must match" p, q = self.symbol, other.symbol pq = self.compose_asymptotic(other, order=order, mode=mode, sign_convention=sign_convention) qp = other.compose_asymptotic(self, order=order, mode=mode, sign_convention=sign_convention) comm_symbol = simplify(pq-qp) return comm_symbol
[docs] def right_inverse_asymptotic(self, order=1): """ Construct a formal right inverse R of the pseudo-differential operator P such that the composition P ∘ R equals the identity plus a smoothing operator of order -order. This method computes an asymptotic expansion for the right inverse using recursive corrections based on derivatives of the symbol p(x, ξ) and lower-order terms of R. Parameters ---------- order : int Number of terms to include in the asymptotic expansion. Higher values improve approximation at the cost of complexity and computational effort. Returns ------- sympy.Expr The symbolic expression representing the formal right inverse R(x, ξ), which satisfies: P ∘ R = Id + O(⟨ξ⟩^{-order}), where ⟨ξ⟩ = (1 + |ξ|²)^{1/2}. Notes ----- - In 1D: The recursion involves spatial derivatives of R and derivatives of p with respect to ξ. - In 2D: The multi-index generalization is used with mixed derivatives in ξ and η. - The construction relies on the non-vanishing of the principal symbol p to ensure invertibility. - Each term in the expansion corresponds to higher-order corrections involving commutators between the operator P and the current approximation of R. """ p = self.symbol if self.dim == 1: x = self.vars_x[0] xi = symbols('xi', real=True) r = 1 / p.subs(xi, xi) # r0 R = r for n in range(1, order + 1): term = 0 for k in range(1, n + 1): coeff = (1j)**(-k) / factorial(k) inner = diff(p, xi, k) * diff(R, x, k) term += coeff * inner R = R - r * term elif self.dim == 2: x, y = self.vars_x xi, eta = symbols('xi eta', real=True) r = 1 / p.subs({xi: xi, eta: eta}) R = r for n in range(1, order + 1): term = 0 for k1 in range(n + 1): for k2 in range(n + 1 - k1): if k1 + k2 == 0: continue coeff = (1j)**(-(k1 + k2)) / (factorial(k1) * factorial(k2)) dp = diff(p, xi, k1, eta, k2) dR = diff(R, x, k1, y, k2) term += coeff * dp * dR R = R - r * term return R
[docs] def left_inverse_asymptotic(self, order=1): """ Construct a formal left inverse L such that the composition L ∘ P equals the identity operator up to terms of order ξ^{-order}. This expansion is performed asymptotically at infinity in the frequency variable(s). The left inverse is built iteratively using symbolic differentiation and the method of asymptotic expansions for pseudo-differential operators. It ensures that: L(P(x,ξ),x,D) ∘ P(x,D) = Id + smoothing operator of order -order Parameters ---------- order : int, optional Maximum number of terms in the asymptotic expansion (default is 1). Higher values yield more accurate inverses at the cost of increased computational complexity. Returns ------- sympy.Expr Symbolic expression representing the principal symbol of the formal left inverse operator L(x,ξ). This expression depends on spatial variables and frequencies, and includes correction terms up to the specified order. Notes ----- - In 1D: Uses recursive application of the Leibniz formula for symbols. - In 2D: Generalizes to multi-indices for mixed derivatives in (x,y) and (ξ,η). - Each term involves combinations of derivatives of the original symbol p(x,ξ) and previously computed terms of the inverse. - Coefficients include powers of 1j (i) and factorial normalization for derivative terms. """ p = self.symbol if self.dim == 1: x = self.vars_x[0] xi = symbols('xi', real=True) l = 1 / p.subs(xi, xi) L = l for n in range(1, order + 1): term = 0 for k in range(1, n + 1): coeff = (1j)**(-k) / factorial(k) inner = diff(L, xi, k) * diff(p, x, k) term += coeff * inner L = L - term * l elif self.dim == 2: x, y = self.vars_x xi, eta = symbols('xi eta', real=True) l = 1 / p.subs({xi: xi, eta: eta}) L = l for n in range(1, order + 1): term = 0 for k1 in range(n + 1): for k2 in range(n + 1 - k1): if k1 + k2 == 0: continue coeff = (1j)**(-(k1 + k2)) / (factorial(k1) * factorial(k2)) dp = diff(p, x, k1, y, k2) dL = diff(L, xi, k1, eta, k2) term += coeff * dL * dp L = L - term * l return L
[docs] def formal_adjoint(self): """ Compute the formal adjoint symbol P* of the pseudo-differential operator. The adjoint is defined such that for any test functions u and v, ⟨P u, v⟩ = ⟨u, P* v⟩ holds in the distributional sense. This is obtained by taking the complex conjugate of the symbol and expanding it asymptotically at infinity to ensure proper behavior under integration by parts. Returns ------- sympy.Expr The adjoint symbol P*(x, ξ) in 1D or P*(x, y, ξ, η) in 2D. Notes: - In 1D, the expansion is performed in powers of 1/|ξ|. - In 2D, the expansion is radial in |ξ| = sqrt(ξ² + η²). - This method ensures symbolic simplifications for readability and efficiency. """ p = self.symbol if self.dim == 1: x, = self.vars_x xi = symbols('xi', real=True) p_star = conjugate(p) p_star = simplify(series(p_star, xi, oo, n=6).removeO()) return p_star elif self.dim == 2: x, y = self.vars_x xi, eta = symbols('xi eta', real=True) p_star = conjugate(p) p_star = simplify(series(p_star, sqrt(xi**2 + eta**2), oo, n=6).removeO()) return p_star
[docs] def exponential_symbol(self, t=1.0, order=1, mode='kn', sign_convention=None): """ Compute the symbol of exp(tP) using asymptotic expansion methods. This method calculates the exponential of a pseudo-differential operator using either a direct power series expansion or a Magnus expansion, depending on the structure of the symbol. The result is valid up to the specified asymptotic order. Parameters ---------- t : float or sympy.Symbol, default=1.0 Time or evolution parameter. Common uses: - t = -i*τ for Schrödinger evolution: exp(-iτH) - t = τ for heat/diffusion: exp(τΔ) - t for general propagators order : int, default=3 Maximum order of the asymptotic expansion. Higher orders include more composition terms, improving accuracy for small t or when non-commutativity effects are significant. Returns ------- sympy.Expr Symbolic expression for the exponential operator symbol, computed as an asymptotic series up to the specified order. Notes ----- - For commutative symbols (e.g., pure multiplication operators), the exponential is exact: exp(tP) = exp(t*p(x,ξ)). - For general non-commutative operators, the method uses the BCH-type expansion via iterated composition: exp(tP) ~ I + tP + (t²/2!)P∘P + (t³/3!)P∘P∘P + ... - Each power P^n is computed via compose_asymptotic, which accounts for the non-commutativity through derivative terms. - The expansion is valid for |t| small enough or when the symbol has appropriate decay/growth properties. - In quantum mechanics (Schrödinger): U(t) = exp(-itH/ℏ) represents the time evolution operator. - In parabolic PDEs (heat equation): exp(tΔ) is the heat kernel. """ if self.dim == 1: x = self.vars_x[0] xi = symbols('xi', real=True) # Initialize with identity result = 1 # First order term: tP current_power = self.symbol result += t * current_power # Higher order terms: (t^n/n!) P^n computed via composition for n in range(2, order + 1): # Compute P^n = P^(n-1) ∘ P via asymptotic composition # We use a temporary operator for composition temp_op = PseudoDifferentialOperator( current_power, [x], mode='symbol' ) current_power = temp_op.compose_asymptotic(self, order=order, mode=mode, sign_convention=sign_convention) # Add term (t^n/n!) * P^n coeff = t**n / factorial(n) result += coeff * current_power return simplify(result) elif self.dim == 2: x, y = self.vars_x xi, eta = symbols('xi eta', real=True) # Initialize with identity result = 1 # First order term: tP current_power = self.symbol result += t * current_power # Higher order terms: (t^n/n!) P^n computed via composition for n in range(2, order + 1): # Compute P^n = P^(n-1) ∘ P via asymptotic composition temp_op = PseudoDifferentialOperator( current_power, [x, y], mode='symbol' ) current_power = temp_op.compose_asymptotic(self, order=order, mode=mode, sign_convention=sign_convention) # Add term (t^n/n!) * P^n coeff = t**n / factorial(n) result += coeff * current_power return simplify(result) else: raise NotImplementedError("Only 1D and 2D operators are supported")
[docs] def trace_formula(self, volume_element=None, numerical=False, x_bounds=None, xi_bounds=None): """ Compute the semiclassical trace of the pseudo-differential operator. The trace formula relates the quantum trace of an operator to a phase-space integral of its symbol, providing a fundamental link between classical and quantum mechanics. This implementation supports both symbolic and numerical integration. Parameters ---------- volume_element : sympy.Expr, optional Custom volume element for the phase space integration. If None, uses the standard Liouville measure dx dξ/(2π)^d. numerical : bool, default=False If True, perform numerical integration over specified bounds. If False, attempt symbolic integration (may fail for complex symbols). x_bounds : tuple of tuples, optional Spatial integration bounds. For 1D: ((x_min, x_max),) For 2D: ((x_min, x_max), (y_min, y_max)) Required if numerical=True. xi_bounds : tuple of tuples, optional Frequency integration bounds. For 1D: ((xi_min, xi_max),) For 2D: ((xi_min, xi_max), (eta_min, eta_max)) Required if numerical=True. Returns ------- sympy.Expr or float The trace of the operator. Returns a symbolic expression if numerical=False, or a float if numerical=True. Notes ----- - The semiclassical trace formula states: Tr(P) = (2π)^{-d} ∫∫ p(x,ξ) dx dξ where d is the spatial dimension and p(x,ξ) is the operator symbol. - For 1D: Tr(P) = (1/2π) ∫_{-∞}^{∞} ∫_{-∞}^{∞} p(x,ξ) dx dξ - For 2D: Tr(P) = (1/4π²) ∫∫∫∫ p(x,y,ξ,η) dx dy dξ dη - This formula is exact for trace-class operators and provides an asymptotic approximation for general pseudo-differential operators. - Physical interpretation: the trace counts the "number of states" weighted by the observable p(x,ξ). - For projection operators (χ_Ω with χ² = χ), the trace gives the dimension of the range, related to the phase space volume of Ω. - The factor (2π)^{-d} comes from the quantum normalization of coherent states / Weyl quantization. """ from sympy import integrate, simplify, lambdify from scipy.integrate import dblquad, nquad p = self.symbol if numerical: if x_bounds is None or xi_bounds is None: raise ValueError( "x_bounds and xi_bounds must be provided for numerical integration" ) if self.dim == 1: x, = self.vars_x xi = symbols('xi', real=True) if volume_element is None: volume_element = 1 / (2 * pi) if numerical: # Numerical integration p_func = lambdify((x, xi), p, 'numpy') (x_min, x_max), = x_bounds (xi_min, xi_max), = xi_bounds def integrand(xi_val, x_val): return p_func(x_val, xi_val) result, error = dblquad( integrand, x_min, x_max, lambda x: xi_min, lambda x: xi_max ) result *= float(volume_element) print(f"Numerical trace = {result:.6e} ± {error:.6e}") return result else: # Symbolic integration integrand = p * volume_element try: # Try to integrate over xi first, then x integral_xi = integrate(integrand, (xi, -oo, oo)) integral_x = integrate(integral_xi, (x, -oo, oo)) return simplify(integral_x) except: print("Warning: Symbolic integration failed. Try numerical=True") return integrate(integrand, (xi, -oo, oo), (x, -oo, oo)) elif self.dim == 2: x, y = self.vars_x xi, eta = symbols('xi eta', real=True) if volume_element is None: volume_element = 1 / (4 * pi**2) if numerical: # Numerical integration in 4D p_func = lambdify((x, y, xi, eta), p, 'numpy') (x_min, x_max), (y_min, y_max) = x_bounds (xi_min, xi_max), (eta_min, eta_max) = xi_bounds def integrand(eta_val, xi_val, y_val, x_val): return p_func(x_val, y_val, xi_val, eta_val) result, error = nquad( integrand, [ [eta_min, eta_max], [xi_min, xi_max], [y_min, y_max], [x_min, x_max] ] ) result *= float(volume_element) print(f"Numerical trace = {result:.6e} ± {error:.6e}") return result else: # Symbolic integration integrand = p * volume_element try: # Integrate in order: eta, xi, y, x integral_eta = integrate(integrand, (eta, -oo, oo)) integral_xi = integrate(integral_eta, (xi, -oo, oo)) integral_y = integrate(integral_xi, (y, -oo, oo)) integral_x = integrate(integral_y, (x, -oo, oo)) return simplify(integral_x) except: print("Warning: Symbolic integration failed. Try numerical=True") return integrate( integrand, (eta, -oo, oo), (xi, -oo, oo), (y, -oo, oo), (x, -oo, oo) ) else: raise NotImplementedError("Only 1D and 2D operators are supported")
[docs] def pseudospectrum_analysis(self, x_grid, lambda_real_range, lambda_imag_range, epsilon_levels=[0.1, 0.01, 0.001, 0.0001], resolution=100, method='spectral', L=None, N=None, use_sparse=False, parallel=True, n_workers=4, adaptive=False, adaptive_threshold=0.5, auto_range=True, plot=True): """ Compute and visualize the pseudospectrum of the operator. Optimizations: - Uses apply() method instead of manual loops - Parallel computation of resolvent norms - Sparse matrix support for large N - Optional adaptive grid refinement Parameters ---------- x_grid : array Spatial grid for quantization lambda_real_range : tuple (min, max) for real part of λ lambda_imag_range : tuple (min, max) for imaginary part of λ epsilon_levels : list Levels for ε-pseudospectrum contours resolution : int Grid resolution for λ sampling method : str 'spectral' or 'finite_difference' L : float, optional Domain half-length for spectral method N : int, optional Number of grid points use_sparse : bool Use sparse matrices for large N parallel : bool Enable parallel computation n_workers : int Number of parallel workers adaptive : bool Use adaptive grid refinement adaptive_threshold : float Threshold for adaptive refinement Returns ------- dict Dictionary with pseudospectrum data and operator matrix """ if self.dim != 1: raise NotImplementedError('Pseudospectrum analysis currently supports 1D only') # Step 1: Build operator matrix print(f"Building operator matrix using '{method}' method...") H, x_grid_used, k_grid = self._build_operator_matrix(x_grid, method, L, N) N_actual = H.shape[0] # Step 1.5: Compute eigenvalues FIRST to adjust range if needed print('Computing eigenvalues...') eigenvalues = self._compute_eigenvalues(H, use_sparse) # Auto-adjust range if requested if auto_range and eigenvalues is not None: eig_real_min, eig_real_max = eigenvalues.real.min(), eigenvalues.real.max() eig_imag_min, eig_imag_max = eigenvalues.imag.min(), eigenvalues.imag.max() # Add 20% margin around eigenvalues margin_real = 0.2 * (eig_real_max - eig_real_min + 1) margin_imag = max(0.2 * (eig_imag_max - eig_imag_min + 1), 2.0) lambda_real_range = (eig_real_min - margin_real, eig_real_max + margin_real) lambda_imag_range = (eig_imag_min - margin_imag, eig_imag_max + margin_imag) print(f'Auto-adjusted λ range:') print(f' Re(λ) ∈ [{lambda_real_range[0]:.2f}, {lambda_real_range[1]:.2f}]') print(f' Im(λ) ∈ [{lambda_imag_range[0]:.2f}, {lambda_imag_range[1]:.2f}]') # Step 2: Compute pseudospectrum with corrected range print(f'Computing pseudospectrum over {resolution}×{resolution} grid...') if adaptive: print('Using adaptive grid refinement...') Lambda, resolvent_norm, sigma_min_grid = self._compute_pseudospectrum_adaptive( H, lambda_real_range, lambda_imag_range, resolution, use_sparse=use_sparse, parallel=parallel, n_workers=n_workers, threshold=adaptive_threshold ) else: Lambda, resolvent_norm, sigma_min_grid = self._compute_pseudospectrum( H, lambda_real_range, lambda_imag_range, resolution, use_sparse=use_sparse, parallel=parallel, n_workers=n_workers ) # Step 3: Visualize if plot: self._plot_pseudospectrum(Lambda, resolvent_norm, sigma_min_grid, epsilon_levels, eigenvalues) return { 'lambda_grid': Lambda, 'resolvent_norm': resolvent_norm, 'sigma_min': sigma_min_grid, 'epsilon_levels': epsilon_levels, 'eigenvalues': eigenvalues, 'operator_matrix': H, 'x_grid': x_grid_used, 'k_grid': k_grid }
def _build_operator_matrix(self, x_grid, method, L, N): """ Build the discrete operator matrix H. Optimized to use the apply() method instead of manual integration. Parameters ---------- x_grid : array Input spatial grid method : str 'spectral' or 'finite_difference' L : float, optional Domain half-length N : int, optional Number of grid points Returns ------- H : ndarray Operator matrix (N×N) x_grid_used : ndarray Actual spatial grid used k_grid : ndarray Frequency grid """ if method == 'spectral': # Setup spectral grid if L is None: L = (x_grid[-1] - x_grid[0]) / 2.0 if N is None: N = len(x_grid) x_grid_spectral = np.linspace(-L, L, N, endpoint=False) dx = x_grid_spectral[1] - x_grid_spectral[0] k = np.fft.fftfreq(N, d=dx) * 2.0 * np.pi # Build matrix by applying operator to canonical basis # This is the KEY OPTIMIZATION: use apply() instead of manual loops H = np.zeros((N, N), dtype=complex) for j in range(N): # Create basis vector e_j e_j = np.zeros(N, dtype=complex) e_j[j] = 1.0 # Apply operator using the existing apply() method # This automatically handles the symbol evaluation and FFT operations H[:, j] = self.apply( e_j, x_grid_spectral, k, boundary_condition='periodic' ) print(f'Operator quantized via apply() method: {N}×{N} matrix') return H, x_grid_spectral, k elif method == 'finite_difference': # Fallback to finite difference (keep original implementation) N = len(x_grid) dx = x_grid[1] - x_grid[0] H = np.zeros((N, N), dtype=complex) for i in range(N): for j in range(N): if i == j: H[i, j] = self.p_func(x_grid[i], 0.0) elif abs(i - j) == 1: xi_approx = np.pi / dx H[i, j] = self.p_func( (x_grid[i] + x_grid[j]) / 2, xi_approx * np.sign(i - j) ) / (2 * dx) elif abs(i - j) == 2: xi_approx = 2 * np.pi / dx H[i, j] = self.p_func( (x_grid[i] + x_grid[j]) / 2, xi_approx ) / dx ** 2 print(f'Operator quantized via finite differences: {N}×{N} matrix') k = np.fft.fftfreq(N, d=dx) * 2.0 * np.pi return H, x_grid, k else: raise ValueError("method must be 'spectral' or 'finite_difference'") def _compute_pseudospectrum(self, H, lambda_real_range, lambda_imag_range, resolution, use_sparse=False, parallel=True, n_workers=4): """ Compute pseudospectrum on a uniform grid. Optimized with parallel computation and optional sparse matrices. Parameters ---------- H : ndarray or sparse matrix Operator matrix lambda_real_range : tuple Range for Re(λ) lambda_imag_range : tuple Range for Im(λ) resolution : int Grid resolution use_sparse : bool Use sparse SVD for large matrices parallel : bool Enable parallel computation n_workers : int Number of parallel workers Returns ------- Lambda : ndarray Complex grid of λ values resolvent_norm : ndarray Norm of (H - λI)^{-1} sigma_min_grid : ndarray Smallest singular value σ_min(H - λI) """ from scipy.linalg import svdvals N = H.shape[0] lambda_re = np.linspace(*lambda_real_range, resolution) lambda_im = np.linspace(*lambda_imag_range, resolution) Lambda_re, Lambda_im = np.meshgrid(lambda_re, lambda_im) Lambda = Lambda_re + 1j * Lambda_im resolvent_norm = np.zeros_like(Lambda, dtype=float) sigma_min_grid = np.zeros_like(Lambda, dtype=float) I = np.eye(N) # Convert to sparse if requested and beneficial if use_sparse and N > 100: from scipy.sparse import csr_matrix, eye as sparse_eye from scipy.sparse.linalg import svds H_sparse = csr_matrix(H) I_sparse = sparse_eye(N, format='csr') use_sparse_svd = True print(f'Using sparse matrices (N={N})') else: use_sparse_svd = False if parallel and resolution * resolution > 100: # Parallel computation Lambda_flat = Lambda.ravel() def compute_single_point(idx): """Compute resolvent norm for a single λ value""" lam = Lambda_flat[idx] try: if use_sparse_svd: # Sparse SVD: compute only smallest singular value A = H_sparse - lam * I_sparse try: # svds can be unstable, wrap in try-except s_min = svds(A, k=1, which='SM', return_singular_vectors=False)[0] except: # Fallback to dense computation s = svdvals(A.toarray()) s_min = s[-1] else: # Dense SVD A = H - lam * I s = svdvals(A) s_min = s[-1] return idx, 1.0 / (s_min + 1e-16), s_min except Exception as e: return idx, np.nan, np.nan # Use ThreadPoolExecutor for parallel computation with ThreadPoolExecutor(max_workers=n_workers) as executor: futures = {executor.submit(compute_single_point, idx): idx for idx in range(len(Lambda_flat))} # Progress tracking completed = 0 total = len(futures) progress_interval = max(1, total // 10) # FIX: Ensure at least 1 for future in as_completed(futures): idx, res_norm, s_min = future.result() resolvent_norm.ravel()[idx] = res_norm sigma_min_grid.ravel()[idx] = s_min completed += 1 if completed % progress_interval == 0: # FIX: Use progress_interval print(f'Progress: {completed}/{total} ({100*completed//total}%)') else: # Sequential computation progress_interval = max(1, resolution // 10) # FIX: Ensure at least 1 for i in range(resolution): for j in range(resolution): lam = Lambda[i, j] try: if use_sparse_svd: A = H_sparse - lam * I_sparse try: s_min = svds(A, k=1, which='SM', return_singular_vectors=False)[0] except: s = svdvals(A.toarray()) s_min = s[-1] else: A = H - lam * I s = svdvals(A) s_min = s[-1] sigma_min_grid[i, j] = s_min resolvent_norm[i, j] = 1.0 / (s_min + 1e-16) except Exception: resolvent_norm[i, j] = np.nan sigma_min_grid[i, j] = np.nan if i % progress_interval == 0: # FIX: Use progress_interval print(f'Progress: {i}/{resolution} rows') return Lambda, resolvent_norm, sigma_min_grid def _compute_pseudospectrum_adaptive(self, H, lambda_real_range, lambda_imag_range, base_resolution, use_sparse=False, parallel=True, n_workers=4, threshold=0.5, max_refinements=2): """ Compute pseudospectrum with adaptive grid refinement. Starts with coarse grid and refines regions with high gradients. Parameters ---------- H : ndarray Operator matrix lambda_real_range : tuple Range for Re(λ) lambda_imag_range : tuple Range for Im(λ) base_resolution : int Initial coarse resolution use_sparse : bool Use sparse matrices parallel : bool Enable parallel computation n_workers : int Number of workers threshold : float Gradient threshold for refinement max_refinements : int Maximum number of refinement levels Returns ------- Lambda : ndarray Complex grid (may be non-uniform) resolvent_norm : ndarray Resolvent norms sigma_min_grid : ndarray Smallest singular values """ # Start with coarse grid coarse_res = base_resolution // 2 print(f'Level 0: Computing coarse grid ({coarse_res}×{coarse_res})...') Lambda_coarse, resolvent_coarse, sigma_coarse = self._compute_pseudospectrum( H, lambda_real_range, lambda_imag_range, coarse_res, use_sparse=use_sparse, parallel=parallel, n_workers=n_workers ) # Compute gradient to identify regions needing refinement log_resolvent = np.log10(resolvent_coarse + 1e-16) grad_y, grad_x = np.gradient(log_resolvent) grad_magnitude = np.sqrt(grad_x**2 + grad_y**2) # Normalize gradient grad_normalized = grad_magnitude / (np.max(grad_magnitude) + 1e-10) # For now, return uniform fine grid # (Full adaptive implementation would require irregular grids) print(f'Level 1: Computing fine grid ({base_resolution}×{base_resolution})...') Lambda_fine, resolvent_fine, sigma_fine = self._compute_pseudospectrum( H, lambda_real_range, lambda_imag_range, base_resolution, use_sparse=use_sparse, parallel=parallel, n_workers=n_workers ) high_gradient_pct = 100 * np.sum(grad_normalized > threshold) / grad_normalized.size print(f'High-gradient regions: {high_gradient_pct:.1f}% of domain') return Lambda_fine, resolvent_fine, sigma_fine def _compute_eigenvalues(self, H, use_sparse=False): """ Compute eigenvalues of operator matrix. Parameters ---------- H : ndarray Operator matrix use_sparse : bool Use sparse eigenvalue solver Returns ------- eigenvalues : ndarray or None Eigenvalues of H """ try: if use_sparse and H.shape[0] > 100: from scipy.sparse.linalg import eigs from scipy.sparse import csr_matrix H_sparse = csr_matrix(H) k = min(20, H.shape[0] - 2) eigenvalues = eigs(H_sparse, k=k, return_eigenvectors=False) else: eigenvalues = np.linalg.eigvals(H) # Print diagnostics print(f'Eigenvalue range: [{eigenvalues.real.min():.2f}, {eigenvalues.real.max():.2f}]') print(f'Imaginary part range: [{eigenvalues.imag.min():.2e}, {eigenvalues.imag.max():.2e}]') return eigenvalues except Exception as e: warnings.warn(f'Eigenvalue computation failed: {e}') return None def _plot_pseudospectrum(self, Lambda, resolvent_norm, sigma_min_grid, epsilon_levels, eigenvalues): """ Plot pseudospectrum results. Parameters ---------- Lambda : ndarray Complex λ grid resolvent_norm : ndarray Resolvent norms sigma_min_grid : ndarray Smallest singular values epsilon_levels : list Contour levels eigenvalues : ndarray or None Eigenvalues to overlay """ Lambda_re = Lambda.real Lambda_im = Lambda.imag plt.figure(figsize=(14, 6)) # Left plot: ε-pseudospectrum plt.subplot(1, 2, 1) # Better contour level computation log_resolvent = np.log10(resolvent_norm + 1e-16) levels_log = np.log10(1.0 / np.array(epsilon_levels)) # Only plot contours that exist in the data range valid_levels = [lv for lv in levels_log if log_resolvent.min() <= lv <= log_resolvent.max()] if len(valid_levels) > 0: cs = plt.contour(Lambda_re, Lambda_im, log_resolvent, levels=valid_levels, colors='blue', linewidths=1.5) # Better labels labels = [f'ε={eps:.0e}' for eps in epsilon_levels[:len(valid_levels)]] fmt = dict(zip(cs.levels, labels)) plt.clabel(cs, inline=True, fmt=fmt, fontsize=9) else: print('⚠️ Warning: No contours in specified epsilon range') # Plot general contours cs = plt.contour(Lambda_re, Lambda_im, log_resolvent, levels=10, colors='blue', linewidths=1.5) if eigenvalues is not None: plt.plot(eigenvalues.real, eigenvalues.imag, 'r*', markersize=10, label='Eigenvalues', markeredgecolor='darkred') plt.xlabel('Re(λ)', fontsize=12) plt.ylabel('Im(λ)', fontsize=12) plt.title('ε-Pseudospectrum: log₁₀(‖(H - λI)⁻¹‖)', fontsize=13) plt.grid(alpha=0.3) plt.legend(fontsize=10) plt.axis('equal') # Right plot: Smallest singular value plt.subplot(1, 2, 2) # Use better colormap normalization from matplotlib.colors import LogNorm # Filter out invalid values sigma_plot = np.where(np.isfinite(sigma_min_grid), sigma_min_grid, np.nan) vmin = np.nanmin(sigma_plot[sigma_plot > 0]) if np.any(sigma_plot > 0) else 1e-10 vmax = np.nanmax(sigma_plot) cs2 = plt.contourf(Lambda_re, Lambda_im, sigma_plot, levels=50, cmap='viridis', norm=LogNorm(vmin=vmin, vmax=vmax)) plt.colorbar(cs2, label='σ_min(H - λI)') if eigenvalues is not None: plt.plot(eigenvalues.real, eigenvalues.imag, 'r*', markersize=10, markeredgecolor='darkred') # Plot epsilon contours for eps in epsilon_levels: cs_eps = plt.contour(Lambda_re, Lambda_im, sigma_plot, levels=[eps], colors='red', linewidths=2, alpha=0.8) plt.xlabel('Re(λ)', fontsize=12) plt.ylabel('Im(λ)', fontsize=12) plt.title('Smallest singular value σ_min(H - λI)', fontsize=13) plt.grid(alpha=0.3) plt.axis('equal') plt.tight_layout() plt.show()
[docs] def symplectic_flow(self): """ Compute the Hamiltonian vector field associated with the principal symbol. This method derives the canonical equations of motion for the phase space variables (x, ξ) in 1D or (x, y, ξ, η) in 2D, based on the Hamiltonian formalism. These describe how position and frequency variables evolve under the flow generated by the symbol. Returns ------- dict A dictionary containing the components of the Hamiltonian vector field: - In 1D: keys are 'dx/dt' and 'dxi/dt', corresponding to dx/dt = ∂p/∂ξ and dξ/dt = -∂p/∂x. - In 2D: keys are 'dx/dt', 'dy/dt', 'dxi/dt', and 'deta/dt', with similar definitions: dx/dt = ∂p/∂ξ, dy/dt = ∂p/∂η, dξ/dt = -∂p/∂x, dη/dt = -∂p/∂y. Notes ----- - The Hamiltonian here is the principal symbol p(x, ξ) itself. - This flow preserves the symplectic structure of phase space. """ if self.dim == 1: x, = self.vars_x xi = symbols('xi', real=True) return { 'dx/dt': diff(self.symbol, xi), 'dxi/dt': -diff(self.symbol, x) } elif self.dim == 2: x, y = self.vars_x xi, eta = symbols('xi eta', real=True) return { 'dx/dt': diff(self.symbol, xi), 'dy/dt': diff(self.symbol, eta), 'dxi/dt': -diff(self.symbol, x), 'deta/dt': -diff(self.symbol, y) }
[docs] def is_elliptic_numerically(self, x_grid, xi_grid, threshold=1e-8): """ Check if the pseudo-differential symbol p(x, ξ) is elliptic over a given grid. A symbol is considered elliptic if its magnitude |p(x, ξ)| remains bounded away from zero across all points in the spatial-frequency domain. This method evaluates the symbol on a grid of spatial and frequency coordinates and checks whether its minimum absolute value exceeds a specified threshold. Resampling is applied to large grids to prevent excessive memory usage, particularly in 2D. Parameters ---------- x_grid : ndarray Spatial grid: either a 1D array (x) or a tuple of two 1D arrays (x, y). xi_grid : ndarray Frequency grid: either a 1D array (ξ) or a tuple of two 1D arrays (ξ, η). threshold : float, optional Minimum acceptable value for |p(x, ξ)|. If the smallest evaluated symbol value falls below this, the symbol is not considered elliptic. Returns ------- bool True if the symbol is elliptic on the resampled grid, False otherwise. """ RESAMPLE_SIZE = 32 if self.dim == 1: x_vals = x_grid xi_vals = xi_grid # Resampling if necessary if len(x_vals) > RESAMPLE_SIZE: x_vals = np.linspace(x_vals.min(), x_vals.max(), RESAMPLE_SIZE) if len(xi_vals) > RESAMPLE_SIZE: xi_vals = np.linspace(xi_vals.min(), xi_vals.max(), RESAMPLE_SIZE) # Ensure grid includes zero for frequency variable (critical for ellipticity check) if 0 not in xi_vals: xi_vals = np.append(xi_vals, 0.0) xi_vals = np.sort(xi_vals) X, XI = np.meshgrid(x_vals, xi_vals, indexing='ij') symbol_vals = self.p_func(X, XI) elif self.dim == 2: x_vals, y_vals = x_grid xi_vals, eta_vals = xi_grid # Spatial resampling if len(x_vals) > RESAMPLE_SIZE: x_vals = np.linspace(x_vals.min(), x_vals.max(), RESAMPLE_SIZE) if len(y_vals) > RESAMPLE_SIZE: y_vals = np.linspace(y_vals.min(), y_vals.max(), RESAMPLE_SIZE) # Frequency resampling - ensure zero is included if len(xi_vals) > RESAMPLE_SIZE: xi_vals = np.linspace(xi_vals.min(), xi_vals.max(), RESAMPLE_SIZE) if len(eta_vals) > RESAMPLE_SIZE: eta_vals = np.linspace(eta_vals.min(), eta_vals.max(), RESAMPLE_SIZE) if 0 not in xi_vals: xi_vals = np.append(xi_vals, 0.0) xi_vals = np.sort(xi_vals) if 0 not in eta_vals: eta_vals = np.append(eta_vals, 0.0) eta_vals = np.sort(eta_vals) X, Y, XI, ETA = np.meshgrid(x_vals, y_vals, xi_vals, eta_vals, indexing='ij') symbol_vals = self.p_func(X, Y, XI, ETA) min_abs_val = np.min(np.abs(symbol_vals)) return min_abs_val > threshold
[docs] def is_self_adjoint(self, tol=1e-10): """ Check whether the pseudo-differential operator is formally self-adjoint (Hermitian). A self-adjoint operator satisfies P = P*, where P* is the formal adjoint of P. This property is essential for ensuring real-valued eigenvalues and stable evolution in quantum mechanics and symmetric wave propagation. Parameters ---------- tol : float Tolerance for symbolic comparison between P and P*. Small numerical differences below this threshold are considered equal. Returns ------- bool True if the symbol p(x, ξ) equals its formal adjoint p*(x, ξ) within the given tolerance, indicating that the operator is self-adjoint. Notes: - The formal adjoint is computed via conjugation and asymptotic expansion at infinity in ξ. - Symbolic simplification is used to verify equality, ensuring robustness against superficial expression differences. """ p = self.symbol p_star = self.formal_adjoint() return simplify(p - p_star).equals(0)
[docs] def visualize_fiber(self, x_grid, xi_grid, x0=0.0, y0=0.0): """ Plot the cotangent fiber structure at a fixed spatial point (x₀[, y₀]). This visualization shows how the symbol p(x, ξ) behaves on the cotangent fiber above a fixed spatial point. In microlocal analysis, this provides insight into the frequency content of the operator at that location. Parameters ---------- x_grid : ndarray Spatial grid values (1D) for evaluation in 1D case. xi_grid : ndarray Frequency grid values (1D) for evaluation in both 1D and 2D cases. x0 : float, optional Fixed x-coordinate of the base point in space (1D or 2D). y0 : float, optional Fixed y-coordinate of the base point in space (2D only). Notes ----- - In 1D: Displays |p(x, ξ)| over the (x, ξ) phase plane near the fixed point. - In 2D: Fixes (x₀, y₀) and evaluates p(x₀, y₀, ξ, η), showing the fiber over that point. - The color map represents the magnitude of the symbol, highlighting regions where it vanishes or becomes singular. Raises ------ NotImplementedError If called in 2D with missing or improperly formatted grids. """ if self.dim == 1: X, XI = np.meshgrid(x_grid, xi_grid, indexing='ij') symbol_vals = self.p_func(X, XI) plt.contourf(X, XI, np.abs(symbol_vals), levels=50, cmap='viridis') plt.colorbar(label='|Symbol|') plt.xlabel('x (position)') plt.ylabel('ξ (frequency)') plt.title('Cotangent Fiber Structure') plt.show() elif self.dim == 2: xi_grid2, eta_grid2 = np.meshgrid(xi_grid, xi_grid) symbol_vals = self.p_func(x0, y0, xi_grid2, eta_grid2) plt.contourf(xi_grid, xi_grid, np.abs(symbol_vals), levels=50, cmap='viridis') plt.colorbar(label='|Symbol|') plt.xlabel('ξ') plt.ylabel('η') plt.title(f'Cotangent Fiber at x={x0}, y={y0}') plt.show()
[docs] def visualize_symbol_amplitude(self, x_grid, xi_grid, y_grid=None, eta_grid=None, xi0=0.0, eta0=0.0): """ Display the modulus |p(x, ξ)| or |p(x, y, ξ₀, η₀)| as a color map. This method visualizes the amplitude of the pseudodifferential operator's symbol in either 1D or 2D spatial configuration. In 2D, the frequency variables are fixed to specified values (ξ₀, η₀) for visualization purposes. Parameters ---------- x_grid, y_grid : ndarray Spatial grids over which to evaluate the symbol. y_grid is optional and used only in 2D. xi_grid, eta_grid : ndarray Frequency grids. In 2D, these define the domain over which the symbol is evaluated, but the visualization fixes ξ = ξ₀ and η = η₀. xi0, eta0 : float, optional Fixed frequency values for slicing in 2D visualization. Defaults to zero. Notes ----- - In 1D: Visualizes |p(x, ξ)| over the (x, ξ) grid. - In 2D: Visualizes |p(x, y, ξ₀, η₀)| at fixed frequencies ξ₀ and η₀. - The color intensity represents the magnitude of the symbol, highlighting regions where the symbol is large or small. """ if self.dim == 1: X, XI = np.meshgrid(x_grid, xi_grid, indexing='ij') symbol_vals = self.p_func(X, XI) plt.pcolormesh(X, XI, np.abs(symbol_vals), shading='auto') plt.colorbar(label='|Symbol|') plt.xlabel('x') plt.ylabel('ξ') plt.title('Symbol Amplitude |p(x, ξ)|') plt.show() elif self.dim == 2: X, Y = np.meshgrid(x_grid, y_grid, indexing='ij') XI = np.full_like(X, xi0) ETA = np.full_like(Y, eta0) symbol_vals = self.p_func(X, Y, XI, ETA) plt.pcolormesh(X, Y, np.abs(symbol_vals), shading='auto') plt.colorbar(label='|Symbol|') plt.xlabel('x') plt.ylabel('y') plt.title(f'Symbol Amplitude at ξ={xi0}, η={eta0}') plt.show()
[docs] def visualize_phase(self, x_grid, xi_grid, y_grid=None, eta_grid=None, xi0=0.0, eta0=0.0): """ Plot the phase (argument) of the pseudodifferential operator's symbol p(x, ξ) or p(x, y, ξ, η). This visualization helps in understanding the oscillatory behavior and regularity properties of the operator in phase space. The phase is displayed modulo 2π using a cyclic colormap ('twilight') to emphasize its periodic nature. Parameters ---------- x_grid : ndarray 1D array of spatial coordinates (x). xi_grid : ndarray 1D array of frequency coordinates (ξ). y_grid : ndarray, optional 2D spatial grid for y-coordinate (in 2D problems). Default is None. eta_grid : ndarray, optional 2D frequency grid for η (in 2D problems). Not used directly but kept for API consistency. xi0 : float, optional Fixed value of ξ for slicing in 2D visualization. Default is 0.0. eta0 : float, optional Fixed value of η for slicing in 2D visualization. Default is 0.0. Notes: - In 1D: Displays arg(p(x, ξ)) over the (x, ξ) phase plane. - In 2D: Displays arg(p(x, y, ξ₀, η₀)) for fixed frequency values (ξ₀, η₀). - Uses plt.pcolormesh with 'twilight' colormap to represent angles from -π to π. Raises: - NotImplementedError: If the spatial dimension is not 1D or 2D. """ if self.dim == 1: X, XI = np.meshgrid(x_grid, xi_grid, indexing='ij') symbol_vals = self.p_func(X, XI) plt.pcolormesh(X, XI, np.angle(symbol_vals), shading='auto', cmap='twilight') plt.colorbar(label='arg(Symbol) [rad]') plt.xlabel('x') plt.ylabel('ξ') plt.title('Phase Portrait (arg p(x, ξ))') plt.show() elif self.dim == 2: X, Y = np.meshgrid(x_grid, y_grid, indexing='ij') XI = np.full_like(X, xi0) ETA = np.full_like(Y, eta0) symbol_vals = self.p_func(X, Y, XI, ETA) plt.pcolormesh(X, Y, np.angle(symbol_vals), shading='auto', cmap='twilight') plt.colorbar(label='arg(Symbol) [rad]') plt.xlabel('x') plt.ylabel('y') plt.title(f'Phase Portrait at ξ={xi0}, η={eta0}') plt.show()
[docs] def visualize_characteristic_set(self, x_grid, xi_grid, y_grid=None, eta_grid=None, y0=0.0, x0=0.0, levels=[1e-1]): """ Visualize the characteristic set of the pseudo-differential symbol, defined as the approximate zero set p(x, ξ) ≈ 0. In microlocal analysis, the characteristic set is the locus of points in phase space (x, ξ) where the symbol p(x, ξ) vanishes, playing a key role in understanding propagation of singularities. Parameters ---------- x_grid : ndarray Spatial grid values (1D array) for plotting in 1D or evaluation point in 2D. xi_grid : ndarray Frequency variable grid values (1D array) used to construct the frequency domain. x0 : float, optional Fixed spatial coordinate in 2D case for evaluating the symbol at a specific x position. y0 : float, optional Fixed spatial coordinate in 2D case for evaluating the symbol at a specific y position. Notes ----- - For 1D, this method plots the contour of |p(x, ξ)| = ε with ε = 1e-5 over the (x, ξ) plane. - For 2D, it evaluates the symbol at fixed (x₀, y₀) and plots the characteristic set in the (ξ, η) frequency plane. - This visualization helps identify directions of degeneracy or hypoellipticity of the operator. Raises ------ NotImplementedError If called on a solver with dimensionality other than 1D or 2D. Displays ------ A matplotlib contour plot showing either: - The characteristic curve in the (x, ξ) phase plane (1D), - The characteristic surface slice in the (ξ, η) frequency plane at (x₀, y₀) (2D). """ if self.dim == 1: x_grid = np.asarray(x_grid) xi_grid = np.asarray(xi_grid) X, XI = np.meshgrid(x_grid, xi_grid, indexing='ij') symbol_vals = self.p_func(X, XI) plt.contour(X, XI, np.abs(symbol_vals), levels=levels, colors='red') plt.xlabel('x') plt.ylabel('ξ') plt.title('Characteristic Set (p(x, ξ) ≈ 0)') plt.grid(True) plt.show() elif self.dim == 2: if eta_grid is None: raise ValueError("eta_grid must be provided for 2D visualization.") xi_grid = np.asarray(xi_grid) eta_grid = np.asarray(eta_grid) xi_grid2, eta_grid2 = np.meshgrid(xi_grid, eta_grid, indexing='ij') symbol_vals = self.p_func(x0, y0, xi_grid2, eta_grid2) plt.contour(xi_grid, eta_grid, np.abs(symbol_vals), levels=levels, colors='red') plt.xlabel('ξ') plt.ylabel('η') plt.title(f'Characteristic Set at x={x0}, y={y0}') plt.grid(True) plt.show() else: raise NotImplementedError("Only 1D/2D characteristic sets supported.")
[docs] def visualize_characteristic_gradient(self, x_grid, xi_grid, y_grid=None, eta_grid=None, y0=0.0, x0=0.0): """ Visualize the norm of the gradient of the symbol in phase space. This method computes the magnitude of the gradient |∇p| of a pseudo-differential symbol p(x, ξ) in 1D or p(x, y, ξ, η) in 2D. The resulting colormap reveals regions where the symbol varies rapidly or remains nearly stationary, which is particularly useful for analyzing characteristic sets. Parameters ---------- x_grid : numpy.ndarray 1D array of spatial coordinates for the x-direction. xi_grid : numpy.ndarray 1D array of frequency coordinates (ξ). y_grid : numpy.ndarray, optional 1D array of spatial coordinates for the y-direction (used in 2D mode). Default is None. eta_grid : numpy.ndarray, optional 1D array of frequency coordinates (η) for the 2D case. Default is None. x0 : float, optional Fixed x-coordinate for evaluating the symbol in 2D. Default is 0.0. y0 : float, optional Fixed y-coordinate for evaluating the symbol in 2D. Default is 0.0. Returns ------- None Displays a 2D colormap of |∇p| over the relevant phase-space domain. Notes ----- - In 1D, the full gradient ∇p = (∂ₓp, ∂ξp) is computed over the (x, ξ) grid. - In 2D, the gradient ∇p = (∂ξp, ∂ηp) is computed at a fixed spatial point (x₀, y₀) over the (ξ, η) grid. - Numerical differentiation is performed using `np.gradient`. - High values of |∇p| indicate rapid variation of the symbol, while low values typically suggest characteristic regions. """ if self.dim == 1: X, XI = np.meshgrid(x_grid, xi_grid, indexing='ij') symbol_vals = self.p_func(X, XI) grad_x = np.gradient(symbol_vals, axis=0) grad_xi = np.gradient(symbol_vals, axis=1) grad_norm = np.sqrt(grad_x**2 + grad_xi**2) plt.pcolormesh(X, XI, grad_norm, cmap='inferno', shading='auto') plt.colorbar(label='|∇p|') plt.xlabel('x') plt.ylabel('ξ') plt.title('Gradient Norm (High Near Zeros)') plt.grid(True) plt.show() elif self.dim == 2: xi_grid2, eta_grid2 = np.meshgrid(xi_grid, eta_grid, indexing='ij') symbol_vals = self.p_func(x0, y0, xi_grid2, eta_grid2) grad_xi = np.gradient(symbol_vals, axis=0) grad_eta = np.gradient(symbol_vals, axis=1) grad_norm = np.sqrt(np.abs(grad_xi)**2 + np.abs(grad_eta)**2) plt.pcolormesh(xi_grid, eta_grid, grad_norm, cmap='inferno', shading='auto') plt.colorbar(label='|∇p|') plt.xlabel('ξ') plt.ylabel('η') plt.title(f'Gradient Norm at x={x0}, y={y0}') plt.grid(True) plt.show()
[docs] def plot_hamiltonian_flow(self, x0=0.0, xi0=5.0, y0=0.0, eta0=0.0, tmax=1.0, n_steps=100, show_field=True): """ Integrate and plot the Hamiltonian trajectories of the symbol in phase space. This method numerically integrates the Hamiltonian vector field derived from the operator's symbol to visualize how singularities propagate under the flow. It supports both 1D and 2D problems. Parameters ---------- x0, xi0 : float Initial position and frequency (momentum) in 1D. y0, eta0 : float, optional Initial position and frequency in 2D; defaults to zero. tmax : float Final integration time for the ODE solver. n_steps : int Number of time steps used in the integration. Notes ----- - The Hamiltonian vector field is obtained from the symplectic flow of the symbol. - If the field is complex-valued, only its real part is used for integration. - In 1D, the trajectory is plotted in (x, ξ) phase space. - In 2D, the spatial trajectory (x(t), y(t)) is shown along with instantaneous momentum vectors (ξ(t), η(t)) using a quiver plot. Raises ------ NotImplementedError If the spatial dimension is not 1D or 2D. Displays -------- matplotlib plot Phase space trajectory(ies) showing the evolution of position and momentum under the Hamiltonian dynamics. """ def make_real(expr): from sympy import re, simplify expr = expr.doit(deep=True) return simplify(re(expr)) H = self.symplectic_flow() if any(im(H[k]) != 0 for k in H): print("⚠️ The Hamiltonian field is complex. Only the real part is used for integration.") if self.dim == 1: x, = self.vars_x xi = symbols('xi', real=True) dxdt_expr = make_real(H['dx/dt']) dxidt_expr = make_real(H['dxi/dt']) dxdt = lambdify((x, xi), dxdt_expr, 'numpy') dxidt = lambdify((x, xi), dxidt_expr, 'numpy') def hamilton(t, Y): x, xi = Y return [dxdt(x, xi), dxidt(x, xi)] sol = solve_ivp(hamilton, [0, tmax], [x0, xi0], t_eval=np.linspace(0, tmax, n_steps)) if sol.status != 0: print(f"⚠️ Integration warning: {sol.message}") n_points = sol.y.shape[1] if n_points < n_steps: print(f"⚠️ Only {n_points} frames computed. Adjusting animation.") n_steps = n_points x_vals, xi_vals = sol.y plt.plot(x_vals, xi_vals) plt.xlabel("x") plt.ylabel("ξ") plt.title("Hamiltonian Flow in Phase Space (1D)") plt.grid(True) plt.show() elif self.dim == 2: x, y = self.vars_x xi, eta = symbols('xi eta', real=True) dxdt = lambdify((x, y, xi, eta), make_real(H['dx/dt']), 'numpy') dydt = lambdify((x, y, xi, eta), make_real(H['dy/dt']), 'numpy') dxidt = lambdify((x, y, xi, eta), make_real(H['dxi/dt']), 'numpy') detadt = lambdify((x, y, xi, eta), make_real(H['deta/dt']), 'numpy') def hamilton(t, Y): x, y, xi, eta = Y return [ dxdt(x, y, xi, eta), dydt(x, y, xi, eta), dxidt(x, y, xi, eta), detadt(x, y, xi, eta) ] sol = solve_ivp(hamilton, [0, tmax], [x0, y0, xi0, eta0], t_eval=np.linspace(0, tmax, n_steps)) if sol.status != 0: print(f"⚠️ Integration warning: {sol.message}") n_points = sol.y.shape[1] if n_points < n_steps: print(f"⚠️ Only {n_points} frames computed. Adjusting animation.") n_steps = n_points x_vals, y_vals, xi_vals, eta_vals = sol.y plt.plot(x_vals, y_vals, label='Position') plt.quiver(x_vals, y_vals, xi_vals, eta_vals, scale=20, width=0.003, alpha=0.5, color='r') # Vector field of the flow (optional) if show_field: X, Y = np.meshgrid(np.linspace(min(x_vals), max(x_vals), 20), np.linspace(min(y_vals), max(y_vals), 20)) XI, ETA = xi0 * np.ones_like(X), eta0 * np.ones_like(Y) U = dxdt(X, Y, XI, ETA) V = dydt(X, Y, XI, ETA) plt.quiver(X, Y, U, V, color='gray', alpha=0.2, scale=30, width=0.002) plt.xlabel("x") plt.ylabel("y") plt.title("Hamiltonian Flow in Phase Space (2D)") plt.legend() plt.grid(True) plt.axis('equal') plt.show()
[docs] def plot_symplectic_vector_field(self, xlim=(-2, 2), klim=(-5, 5), density=30): """ Visualize the symplectic vector field (Hamiltonian vector field) associated with the operator's symbol. The plotted vector field corresponds to (∂_ξ p, -∂_x p), where p(x, ξ) is the principal symbol of the pseudo-differential operator. This field governs the bicharacteristic flow in phase space. Parameters ---------- xlim : tuple of float Range for spatial variable x, as (x_min, x_max). klim : tuple of float Range for frequency variable ξ, as (ξ_min, ξ_max). density : int Number of grid points per axis for the visualization grid. Raises ------ NotImplementedError If called on a 2D operator (currently only 1D implementation available). Notes ----- - Only supports one-dimensional operators. - Uses symbolic differentiation to compute ∂_ξ p and ∂_x p. - Numerical evaluation is done via lambdify with NumPy backend. - Visualization uses matplotlib quiver plot to show vector directions. """ x_vals = np.linspace(*xlim, density) xi_vals = np.linspace(*klim, density) X, XI = np.meshgrid(x_vals, xi_vals, indexing='ij') if self.dim != 1: raise NotImplementedError("Only 1D version implemented.") x, = self.vars_x xi = symbols('xi', real=True) H = self.symplectic_flow() dxdt = lambdify((x, xi), simplify(H['dx/dt']), 'numpy') dxidt = lambdify((x, xi), simplify(H['dxi/dt']), 'numpy') U = dxdt(X, XI) V = dxidt(X, XI) plt.quiver(X, XI, U, V, scale=10, width=0.005) plt.xlabel('x') plt.ylabel(r'$\xi$') plt.title("Symplectic Vector Field (1D)") plt.grid(True) plt.show()
[docs] def visualize_micro_support(self, xlim=(-2, 2), klim=(-10, 10), threshold=1e-3, density=300): """ Visualize the micro-support of the operator by plotting the inverse of the symbol magnitude 1 / |p(x, ξ)|. The micro-support provides insight into the singularities of a pseudo-differential operator in phase space (x, ξ). Regions where |p(x, ξ)| is small correspond to large values in 1/|p(x, ξ)|, highlighting areas of significant operator influence or singularity. Parameters ---------- xlim : tuple Spatial domain limits (x_min, x_max). klim : tuple Frequency domain limits (ξ_min, ξ_max). threshold : float Threshold below which |p(x, ξ)| is considered effectively zero; used for numerical stability. density : int Number of grid points along each axis for visualization resolution. Raises ------ NotImplementedError If called on a solver with dimension greater than 1 (only 1D visualization is supported). Notes ----- - This method evaluates the symbol p(x, ξ) over a grid and plots its reciprocal to emphasize regions where the symbol is near zero. - A small constant (1e-10) is added to the denominator to avoid division by zero. - The resulting plot helps identify characteristic sets. """ if self.dim != 1: raise NotImplementedError("Only 1D micro-support visualization implemented.") x_vals = np.linspace(*xlim, density) xi_vals = np.linspace(*klim, density) X, XI = np.meshgrid(x_vals, xi_vals, indexing='ij') Z = np.abs(self.p_func(X, XI)) plt.contourf(X, XI, 1 / (Z + 1e-10), levels=100, cmap='inferno') plt.colorbar(label=r'$1/|p(x,\xi)|$') plt.xlabel('x') plt.ylabel(r'$\xi$') plt.title("Micro-Support Estimate (1/|Symbol|)") plt.show()
[docs] def group_velocity_field(self, xlim=(-2, 2), klim=(-10, 10), density=30): """ Plot the group velocity field ∇_ξ p(x, ξ) for 1D pseudo-differential operators. The group velocity represents the speed at which waves of different frequencies propagate in a dispersive medium. It is defined as the gradient of the symbol p(x, ξ) with respect to the frequency variable ξ. Parameters ---------- xlim : tuple of float Spatial domain limits (x-axis). klim : tuple of float Frequency domain limits (ξ-axis). density : int Number of grid points per axis used for visualization. Raises ------ NotImplementedError If called on a 2D operator, since this visualization is only implemented for 1D. Notes ----- - This method visualizes the vector field (∂p/∂ξ) in phase space. - Used for analyzing wave propagation properties and dispersion relations. - Requires symbolic expression self.expr depending on x and ξ. """ if self.dim != 1: raise NotImplementedError("Only 1D group velocity visualization implemented.") x, = self.vars_x xi = symbols('xi', real=True) dp_dxi = diff(self.symbol, xi) grad_func = lambdify((x, xi), dp_dxi, 'numpy') x_vals = np.linspace(*xlim, density) xi_vals = np.linspace(*klim, density) X, XI = np.meshgrid(x_vals, xi_vals, indexing='ij') V = grad_func(X, XI) plt.quiver(X, XI, np.ones_like(V), V, scale=10, width=0.004) plt.xlabel('x') plt.ylabel(r'$\xi$') plt.title("Group Velocity Field (1D)") plt.grid(True) plt.show()
[docs] def animate_singularity(self, xi0=5.0, eta0=0.0, x0=0.0, y0=0.0, tmax=4.0, n_frames=100, projection=None): """ Animate the propagation of a singularity under the Hamiltonian flow. This method visualizes how a singularity (x₀, y₀, ξ₀, η₀) evolves in phase space according to the Hamiltonian dynamics induced by the principal symbol of the operator. The animation integrates the Hamiltonian equations of motion and supports various projections: position (x-y), frequency (ξ-η), or mixed phase space coordinates. Parameters ---------- xi0, eta0 : float Initial frequency components (ξ₀, η₀). x0, y0 : float Initial spatial coordinates (x₀, y₀). tmax : float Total time of integration (final animation time). n_frames : int Number of frames in the resulting animation. projection : str or None Type of projection to display: - 'position' : x vs y (or x alone in 1D) - 'frequency': ξ vs η (or ξ alone in 1D) - 'phase' : mixed coordinates like x vs ξ or x vs η If None, defaults to 'phase' in 1D and 'position' in 2D. Returns ------- matplotlib.animation.FuncAnimation Animation object that can be displayed interactively in Jupyter notebooks or saved as a video. Notes ----- - In 1D, only one spatial and one frequency variable are used. - Complex-valued Hamiltonian fields are truncated to their real parts for integration. - Trajectories are shown with both instantaneous position (dot) and full path (dashed line). """ rc('animation', html='jshtml') def make_real(expr): from sympy import re, simplify expr = expr.doit(deep=True) return simplify(re(expr)) H = self.symplectic_flow() H = {k: v.doit(deep=True) for k, v in H.items()} print("H = ", H) if any(im(H[k]) != 0 for k in H): print("⚠️ The Hamiltonian field is complex. Only the real part is used for integration.") if self.dim == 1: x, = self.vars_x xi = symbols('xi', real=True) dxdt = lambdify((x, xi), make_real(H['dx/dt']), 'numpy') dxidt = lambdify((x, xi), make_real(H['dxi/dt']), 'numpy') def hamilton(t, Y): x, xi = Y return [dxdt(x, xi), dxidt(x, xi)] sol = solve_ivp(hamilton, [0, tmax], [x0, xi0], t_eval=np.linspace(0, tmax, n_frames)) if sol.status != 0: print(f"⚠️ Integration warning: {sol.message}") n_points = sol.y.shape[1] if n_points < n_frames: print(f"⚠️ Only {n_points} frames computed. Adjusting animation.") n_frames = n_points x_vals, xi_vals = sol.y if projection is None: projection = 'phase' fig, ax = plt.subplots() point, = ax.plot([], [], 'ro') traj, = ax.plot([], [], 'b--', lw=1, alpha=0.5) if projection == 'phase': ax.set_xlabel('x') ax.set_ylabel(r'$\xi$') ax.set_xlim(np.min(x_vals) - 1, np.max(x_vals) + 1) ax.set_ylim(np.min(xi_vals) - 1, np.max(xi_vals) + 1) def update(i): point.set_data([x_vals[i]], [xi_vals[i]]) traj.set_data(x_vals[:i+1], xi_vals[:i+1]) return point, traj elif projection == 'position': ax.set_xlabel('x') ax.set_ylabel('x') ax.set_xlim(np.min(x_vals) - 1, np.max(x_vals) + 1) ax.set_ylim(np.min(x_vals) - 1, np.max(x_vals) + 1) def update(i): point.set_data([x_vals[i]], [x_vals[i]]) traj.set_data(x_vals[:i+1], x_vals[:i+1]) return point, traj elif projection == 'frequency': ax.set_xlabel(r'$\xi$') ax.set_ylabel(r'$\xi$') ax.set_xlim(np.min(xi_vals) - 1, np.max(xi_vals) + 1) ax.set_ylim(np.min(xi_vals) - 1, np.max(xi_vals) + 1) def update(i): point.set_data([xi_vals[i]], [xi_vals[i]]) traj.set_data(xi_vals[:i+1], xi_vals[:i+1]) return point, traj else: raise ValueError("Invalid projection mode") ax.set_title(f"1D Singularity Flow ({projection})") ax.grid(True) ani = animation.FuncAnimation(fig, update, frames=n_frames, interval=50) plt.close(fig) return ani elif self.dim == 2: x, y = self.vars_x xi, eta = symbols('xi eta', real=True) dxdt = lambdify((x, y, xi, eta), make_real(H['dx/dt']), 'numpy') dydt = lambdify((x, y, xi, eta), make_real(H['dy/dt']), 'numpy') dxidt = lambdify((x, y, xi, eta), make_real(H['dxi/dt']), 'numpy') detadt = lambdify((x, y, xi, eta), make_real(H['deta/dt']), 'numpy') def hamilton(t, Y): x, y, xi, eta = Y return [ dxdt(x, y, xi, eta), dydt(x, y, xi, eta), dxidt(x, y, xi, eta), detadt(x, y, xi, eta) ] sol = solve_ivp(hamilton, [0, tmax], [x0, y0, xi0, eta0], t_eval=np.linspace(0, tmax, n_frames)) if sol.status != 0: print(f"⚠️ Integration warning: {sol.message}") n_points = sol.y.shape[1] if n_points < n_frames: print(f"⚠️ Only {n_points} frames computed. Adjusting animation.") n_frames = n_points x_vals, y_vals, xi_vals, eta_vals = sol.y if projection is None: projection = 'position' fig, ax = plt.subplots() point, = ax.plot([], [], 'ro') traj, = ax.plot([], [], 'b--', lw=1, alpha=0.5) if projection == 'position': ax.set_xlabel('x') ax.set_ylabel('y') ax.set_xlim(np.min(x_vals) - 1, np.max(x_vals) + 1) ax.set_ylim(np.min(y_vals) - 1, np.max(y_vals) + 1) def update(i): point.set_data([x_vals[i]], [y_vals[i]]) traj.set_data(x_vals[:i+1], y_vals[:i+1]) return point, traj elif projection == 'frequency': ax.set_xlabel(r'$\xi$') ax.set_ylabel(r'$\eta$') ax.set_xlim(np.min(xi_vals) - 1, np.max(xi_vals) + 1) ax.set_ylim(np.min(eta_vals) - 1, np.max(eta_vals) + 1) def update(i): point.set_data([xi_vals[i]], [eta_vals[i]]) traj.set_data(xi_vals[:i+1], eta_vals[:i+1]) return point, traj elif projection == 'phase': ax.set_xlabel('x') ax.set_ylabel(r'$\eta$') ax.set_xlim(np.min(x_vals) - 1, np.max(x_vals) + 1) ax.set_ylim(np.min(eta_vals) - 1, np.max(eta_vals) + 1) def update(i): point.set_data([x_vals[i]], [eta_vals[i]]) traj.set_data(x_vals[:i+1], eta_vals[:i+1]) return point, traj else: raise ValueError("Invalid projection mode") ax.set_title(f"2D Singularity Flow ({projection})") ax.grid(True) ax.axis('equal') ani = animation.FuncAnimation(fig, update, frames=n_frames, interval=50) plt.close(fig) return ani
[docs] def interactive_symbol_analysis(pseudo_op, xlim=(-2, 2), ylim=(-2, 2), xi_range=(0.1, 5), eta_range=(-5, 5), density=50): """ Launch an interactive dashboard for symbol exploration using ipywidgets. This function provides a user-friendly interface to visualize various aspects of the pseudo-differential operator's symbol. It supports multiple visualization modes in both 1D and 2D, including group velocity fields, micro-support estimates, symplectic vector fields, symbol amplitude/phase, cotangent fiber structure, characteristic sets and Hamiltonian flows. Parameters ---------- pseudo_op : PseudoDifferentialOperator The pseudo-differential operator whose symbol is to be analyzed interactively. xlim, ylim : tuple of float Spatial domain limits along x and y axes respectively. xi_range, eta_range : tuple Frequency domain limits along ξ and η axes respectively. density : int Number of points per axis used to construct the evaluation grid. Controls resolution. Notes ----- - In 1D mode, sliders control the fixed frequency (ξ₀) and spatial position (x₀). - In 2D mode, additional sliders control the second frequency component (η₀) and second spatial coordinate (y₀). - Visualization updates dynamically as parameters are adjusted via sliders or dropdown menus. - Supported visualization modes: 'Symbol Amplitude' : |p(x,ξ)| or |p(x,y,ξ,η)| 'Symbol Phase' : arg(p(x,ξ)) or similar in 2D 'Micro-Support (1/|p|)' : Reciprocal of symbol magnitude 'Cotangent Fiber' : Structure of symbol over frequency space at fixed x 'Characteristic Set' : Zero set approximation {p ≈ 0} 'Characteristic Gradient' : |∇p(x, ξ)| or |∇p(x₀, y₀, ξ, η)| 'Group Velocity Field' : ∇_ξ p(x,ξ) or ∇_{ξ,η} p(x,y,ξ,η) 'Symplectic Vector Field' : (∇_ξ p, -∇_x p) or similar in 2D 'Hamiltonian Flow' : Trajectories generated by the Hamiltonian vector field Raises ------ NotImplementedError If the spatial dimension is not 1D or 2D. Prints ------ Interactive matplotlib figures with dynamic updates based on widget inputs. """ dim = pseudo_op.dim expr = pseudo_op.expr vars_x = pseudo_op.vars_x mode_selector_1D = Dropdown( options=[ 'Symbol Amplitude', 'Symbol Phase', 'Micro-Support (1/|p|)', 'Cotangent Fiber', 'Characteristic Set', 'Characteristic Gradient', 'Group Velocity Field', 'Symplectic Vector Field', 'Hamiltonian Flow', ], value='Symbol Amplitude', description='Mode:' ) mode_selector_2D = Dropdown( options=[ 'Symbol Amplitude', 'Symbol Phase', 'Micro-Support (1/|p|)', 'Cotangent Fiber', 'Characteristic Set', 'Characteristic Gradient', 'Symplectic Vector Field', 'Hamiltonian Flow', ], value='Symbol Amplitude', description='Mode:' ) x_vals = np.linspace(*xlim, density) if dim == 2: y_vals = np.linspace(*ylim, density) if dim == 1: x, = vars_x xi = symbols('xi', real=True) grad_func = lambdify((x, xi), diff(expr, xi), 'numpy') symplectic_func = lambdify((x, xi), [diff(expr, xi), -diff(expr, x)], 'numpy') symbol_func = lambdify((x, xi), expr, 'numpy') xi_slider = FloatSlider(min=xi_range[0], max=xi_range[1], step=0.1, value=1.0, description='ξ₀') x_slider = FloatSlider(min=xlim[0], max=xlim[1], step=0.1, value=0.0, description='x₀') def plot_1d(mode, xi0, x0): X = x_vals[:, None] if mode == 'Group Velocity Field': V = grad_func(X, xi0) plt.quiver(X, V, np.ones_like(V), V, scale=10, width=0.004) plt.xlabel('x') plt.title(f'Group Velocity Field at ξ={xi0:.2f}') elif mode == 'Micro-Support (1/|p|)': Z = 1 / (np.abs(symbol_func(X, xi0)) + 1e-10) plt.plot(x_vals, Z) plt.xlabel('x') plt.title(f'Micro-Support (1/|p|) at ξ={xi0:.2f}') elif mode == 'Symplectic Vector Field': U, V = symplectic_func(X, xi0) plt.quiver(X, V, U, V, scale=10, width=0.004) plt.xlabel('x') plt.title(f'Symplectic Field at ξ={xi0:.2f}') elif mode == 'Symbol Amplitude': Z = np.abs(symbol_func(X, xi0)) plt.plot(x_vals, Z) plt.xlabel('x') plt.title(f'Symbol Amplitude |p(x,ξ)| at ξ={xi0:.2f}') elif mode == 'Symbol Phase': Z = np.angle(symbol_func(X, xi0)) plt.plot(x_vals, Z) plt.xlabel('x') plt.title(f'Symbol Phase arg(p(x,ξ)) at ξ={xi0:.2f}') elif mode == 'Cotangent Fiber': pseudo_op.visualize_fiber(x_vals, np.linspace(*xi_range, density), x0=x0) elif mode == 'Characteristic Set': pseudo_op.visualize_characteristic_set(x_vals, np.linspace(*xi_range, density), x0=x0) elif mode == 'Characteristic Gradient': pseudo_op.visualize_characteristic_gradient(x_vals, np.linspace(*xi_range, density), x0=x0) elif mode == 'Hamiltonian Flow': pseudo_op.plot_hamiltonian_flow(x0=x0, xi0=xi0) # --- Dynamic container for sliders --- controls_box = VBox([mode_selector_1D, xi_slider, x_slider]) # --- Function to adjust visible sliders based on mode --- def update_controls(change): mode = change['new'] # modes that depend only on xi and eta if mode in ['Symbol Amplitude', 'Symbol Phase', 'Micro-Support (1/|p|)', 'Group Velocity Field', 'Symplectic Vector Field']: controls_box.children = [mode_selector_1D, xi_slider] # modes that require xi and x elif mode in ['Hamiltonian Flow']: controls_box.children = [mode_selector_1D, xi_slider, x_slider] # modes that require nothing elif mode in ['Cotangent Fiber', 'Characteristic Set', 'Characteristic Gradient']: controls_box.children = [mode_selector_1D] mode_selector_1D.observe(update_controls, names='value') update_controls({'new': mode_selector_1D.value}) # --- Interactive binding --- out = interactive_output(plot_1d, {'mode': mode_selector_1D, 'xi0': xi_slider, 'x0': x_slider}) display(VBox([controls_box, out])) elif dim == 2: x, y = vars_x xi, eta = symbols('xi eta', real=True) symplectic_func = lambdify((x, y, xi, eta), [diff(expr, xi), diff(expr, eta)], 'numpy') symbol_func = lambdify((x, y, xi, eta), expr, 'numpy') xi_slider=FloatSlider(min=xi_range[0], max=xi_range[1], step=0.1, value=1.0, description='ξ₀') eta_slider=FloatSlider(min=eta_range[0], max=eta_range[1], step=0.1, value=1.0, description='η₀') x_slider=FloatSlider(min=xlim[0], max=xlim[1], step=0.1, value=0.0, description='x₀') y_slider=FloatSlider(min=ylim[0], max=ylim[1], step=0.1, value=0.0, description='y₀') def plot_2d(mode, xi0, eta0, x0, y0): X, Y = np.meshgrid(x_vals, y_vals, indexing='ij') if mode == 'Micro-Support (1/|p|)': Z = 1 / (np.abs(symbol_func(X, Y, xi0, eta0)) + 1e-10) plt.pcolormesh(X, Y, Z, shading='auto', cmap='inferno') plt.colorbar(label='1/|p|') plt.xlabel('x') plt.ylabel('y') plt.title(f'Micro-Support at ξ={xi0:.2f}, η={eta0:.2f}') elif mode == 'Symplectic Vector Field': U, V = symplectic_func(X, Y, xi0, eta0) plt.quiver(X, Y, U, V, scale=10, width=0.004) plt.xlabel('x') plt.ylabel('y') plt.title(f'Symplectic Field at ξ={xi0:.2f}, η={eta0:.2f}') elif mode == 'Symbol Amplitude': Z = np.abs(symbol_func(X, Y, xi0, eta0)) plt.pcolormesh(X, Y, Z, shading='auto') plt.colorbar(label='|p(x,y,ξ,η)|') plt.xlabel('x') plt.ylabel('y') plt.title(f'Symbol Amplitude at ξ={xi0:.2f}, η={eta0:.2f}') elif mode == 'Symbol Phase': Z = np.angle(symbol_func(X, Y, xi0, eta0)) plt.pcolormesh(X, Y, Z, shading='auto', cmap='twilight') plt.colorbar(label='arg(p)') plt.xlabel('x') plt.ylabel('y') plt.title(f'Symbol Phase at ξ={xi0:.2f}, η={eta0:.2f}') elif mode == 'Cotangent Fiber': pseudo_op.visualize_fiber(np.linspace(*xi_range, density), np.linspace(*eta_range, density), x0=x0, y0=y0) elif mode == 'Characteristic Set': pseudo_op.visualize_characteristic_set(x_grid=x_vals, xi_grid=np.linspace(*xi_range, density), y_grid=y_vals, eta_grid=np.linspace(*eta_range, density), x0=x0, y0=y0) elif mode == 'Characteristic Gradient': pseudo_op.visualize_characteristic_gradient(x_grid=x_vals, xi_grid=np.linspace(*xi_range, density), y_grid=y_vals, eta_grid=np.linspace(*eta_range, density), x0=x0, y0=y0) elif mode == 'Hamiltonian Flow': pseudo_op.plot_hamiltonian_flow(x0=x0, y0=y0, xi0=xi0, eta0=eta0) # --- Dynamic container for sliders --- controls_box = VBox([mode_selector_2D, xi_slider, eta_slider, x_slider, y_slider]) # --- Function to adjust visible sliders based on mode --- def update_controls(change): mode = change['new'] # modes that depend only on xi if mode in ['Symbol Amplitude', 'Symbol Phase', 'Micro-Support (1/|p|)', 'Symplectic Vector Field']: controls_box.children = [mode_selector_2D, xi_slider, eta_slider] # modes that require xi, eta, x and y elif mode in ['Hamiltonian Flow']: controls_box.children = [mode_selector_2D, xi_slider, eta_slider, x_slider, y_slider] # modes that require x and y elif mode in ['Cotangent Fiber', 'Characteristic Set', 'Characteristic Gradient']: controls_box.children = [mode_selector_2D, x_slider, y_slider] mode_selector_2D.observe(update_controls, names='value') update_controls({'new': mode_selector_2D.value}) # --- Interactive binding --- out = interactive_output(plot_2d, {'mode': mode_selector_2D, 'xi0': xi_slider, 'eta0': eta_slider, 'x0': x_slider, 'y0': y_slider}) display(VBox([controls_box, out]))
# ============================================================================ # Standalone functions for Kohn-Nirenberg quantization # ============================================================================ # --------------------------------------------------------------------------- # Frequency-domain window helpers (reused in both functions) # ---------------------------------------------------------------------------
[docs] def freq_window_2d(P, KXb, KYb, kx_shift, ky_shift, mode): """Apply a 2-D frequency window in-place and return P.""" if mode == 'gaussian': sigma_kx = 0.8 * np.max(np.abs(kx_shift)) sigma_ky = 0.8 * np.max(np.abs(ky_shift)) P *= np.exp(-(KXb / sigma_kx) ** 4) * np.exp(-(KYb / sigma_ky) ** 4) elif mode == 'hann': Wx = 0.5 * (1 + np.cos(np.pi * KXb / np.max(np.abs(kx_shift)))) Wy = 0.5 * (1 + np.cos(np.pi * KYb / np.max(np.abs(ky_shift)))) P *= Wx * Wy * (np.abs(KXb) < np.max(np.abs(kx_shift))) \ * (np.abs(KYb) < np.max(np.abs(ky_shift))) return P
# =========================================================================== # kohn_nirenberg_fft (periodic boundary, FFT-based) # ===========================================================================
[docs] def kohn_nirenberg_fft(u_vals, symbol_func, x_grid, kx, fft_func, ifft_func, dim=1, y_grid=None, ky=None, freq_window='gaussian', clamp=1e6, space_window=False): """ Numerically stable Kohn–Nirenberg quantization of a pseudo-differential operator. Applies the pseudo-differential operator Op(p) to the function f via the Kohn–Nirenberg quantization: [Op(p)f](x) = (1/(2π)^d) ∫ p(x, ξ) e^{ix·ξ} ℱ[f](ξ) dξ where p(x, ξ) is a symbol that may depend on both spatial variables x and frequency variables ξ. This method supports both 1D and 2D cases and includes optional smoothing techniques to improve numerical stability. Parameters ---------- u_vals : np.ndarray Spatial samples of the input function f(x) or f(x, y), defined on a uniform grid. symbol_func : callable A function representing the full symbol p(x, ξ) in 1D or p(x, y, ξ, η) in 2D. Must accept NumPy-compatible array inputs and return a complex-valued array. freq_window : {'gaussian', 'hann', None}, optional Type of frequency-domain window to apply: - 'gaussian': smooth decay near high frequencies - 'hann': cosine-based tapering with hard cutoff - None: no frequency window applied clamp : float, optional Upper bound on the absolute value of the symbol. Prevents numerical blow-up from large values. space_window : bool, optional Whether to apply a spatial Gaussian window to suppress edge effects in physical space. Parameters ---------- u_vals : ndarray Input function values symbol_func : callable Symbol function p(x, ξ) or p(x, y, ξ, η) x_grid : ndarray Spatial grid in x direction kx : ndarray Frequency grid in x direction fft_func : callable FFT function (fft or fft2) ifft_func : callable Inverse FFT function dim : int Dimension (1 or 2) y_grid : ndarray, optional Spatial grid in y direction (for 2D) ky : ndarray, optional Frequency grid in y direction (for 2D) freq_window : str Windowing function ('gaussian' or 'hann') clamp : float Clamp symbol values to [-clamp, clamp] space_window : bool Apply spatial windowing Returns ------- ndarray Result of applying the operator 2-D optimizations ----------------- * The 4-D tensor (Nx, Ny, Nkx, Nky) is never fully materialized. The x-axis is sliced into row-blocks; for each block only a (block_rows, Ny, Nkx, Nky) array is needed. * Row-blocks are processed in parallel with ThreadPoolExecutor. * Phase kernel exp(i*x*kx) is factored as a pair of 2-D arrays, exploiting exp(i*(x*kx + y*ky)) = exp(i*x*kx) * exp(i*y*ky). """ # ----------------------------------------------------------------------- # 1-D (unchanged – already fast for typical Nx) # ----------------------------------------------------------------------- if dim == 1: dx = x_grid[1] - x_grid[0] Nx = len(x_grid) k = 2 * np.pi * fftshift(fftfreq(Nx, d=dx)) dk = k[1] - k[0] f_hat = fftshift(fft_func(fftshift(u_vals)) * dx) X, K = np.meshgrid(x_grid, k, indexing='ij') P = symbol_func(X, K) P = np.clip(P.astype(np.complex128), -clamp, clamp) if freq_window == 'gaussian': sigma = 0.8 * np.max(np.abs(k)) P *= np.exp(-(K / sigma) ** 4) elif freq_window == 'hann': W = 0.5 * (1 + np.cos(np.pi * K / np.max(np.abs(K)))) P *= W * (np.abs(K) < np.max(np.abs(K))) if space_window: x0 = (x_grid[0] + x_grid[-1]) / 2 L = (x_grid[-1] - x_grid[0]) / 2 P *= np.exp(-((X - x0) / L) ** 2) kernel = np.exp(1j * X * K) u = np.sum(P * f_hat[None, :] * kernel, axis=1) * dk / (2 * np.pi) return u # ----------------------------------------------------------------------- # 2-D (block-parallel, memory-efficient) # ----------------------------------------------------------------------- elif dim == 2: dx = x_grid[1] - x_grid[0] dy = y_grid[1] - y_grid[0] Nx, Ny = len(x_grid), len(y_grid) kx_s = 2 * np.pi * fftshift(fftfreq(Nx, d=dx)) # (Nkx,) ky_s = 2 * np.pi * fftshift(fftfreq(Ny, d=dy)) # (Nky,) dkx = kx_s[1] - kx_s[0] dky = ky_s[1] - ky_s[0] # Global FFT of u → f_hat shape (Nkx, Nky) f_hat = fftshift(fft_func(fftshift(u_vals)) * dx * dy) # (Nx, Ny) # Pre-factored phase components (avoid 4-D outer product) # exp(i*(x*kx + y*ky)) = exp_x[x, kx] * exp_y[y, ky] exp_y = np.exp(1j * np.outer(y_grid, ky_s)) # (Ny, Nky) # Spatial window along y (independent of x-block) if space_window: y0 = (y_grid[0] + y_grid[-1]) / 2 Ly = (y_grid[-1] - y_grid[0]) / 2 sw_y = np.exp(-((y_grid - y0) / Ly) ** 2) # (Ny,) else: sw_y = None # 2-D frequency grids (Nkx, Nky) – used for windowing KX2, KY2 = np.meshgrid(kx_s, ky_s, indexing='ij') # each (Nkx, Nky) # Row-block parameters n_workers = FFT_WORKERS base = max(1, Nx // n_workers) boundaries = [(i * base, min((i + 1) * base, Nx)) for i in range(n_workers) if i * base < Nx] result = np.zeros((Nx, Ny), dtype=np.complex128) def _process_block(bounds): """ Integrate over frequency for a horizontal slice x[i0:i1]. Memory: (block, Ny, Nkx, Nky) where block = i1 - i0. """ i0, i1 = bounds x_blk = x_grid[i0:i1] # (B,) B = i1 - i0 # ---- symbol evaluated on the block ------------------------- # Broadcast shapes: x (B,1,1,1), y (1,Ny,1,1), # kx (1,1,Nkx,1), ky (1,1,1,Nky) Xb = x_blk[:, None, None, None] Yb = y_grid[None, :, None, None] KXb = kx_s[None, None, :, None] KYb = ky_s[None, None, None, :] P = symbol_func(Xb, Yb, KXb, KYb).astype(np.complex128) P = np.clip(P, -clamp, clamp) # ---- frequency window (operates on kx/ky dims) ------------- freq_window_2d(P, KXb, KYb, kx_s, ky_s, freq_window) # ---- spatial window ---------------------------------------- if space_window: x0 = (x_grid[0] + x_grid[-1]) / 2 Lx = (x_grid[-1] - x_grid[0]) / 2 sw_x = np.exp(-((x_blk - x0) / Lx) ** 2) # (B,) P *= sw_x[:, None, None, None] if sw_y is not None: P *= sw_y[None, :, None, None] # ---- phase exp(i*(x*kx + y*ky)) --------------------------- # Factored: exp_x (B,1,Nkx,1) * exp_y (1,Ny,1,Nky) exp_x_blk = np.exp(1j * np.outer(x_blk, kx_s))\ .reshape(B, 1, len(kx_s), 1) # (B,1,Nkx,1) exp_y_blk = exp_y[None, :, None, :] # (1,Ny,1,Nky) phase = exp_x_blk * exp_y_blk # (B,Ny,Nkx,Nky) # f_hat broadcast: (1,1,Nkx,Nky) fh = f_hat[None, None, :, :] # Integrate over (kx, ky) integrand = P * fh * phase # (B,Ny,Nkx,Nky) blk_res = np.sum(integrand, axis=(2, 3)) * dkx * dky / (2 * np.pi) ** 2 return i0, i1, blk_res # (B, Ny) with ThreadPoolExecutor(max_workers=n_workers) as executor: for i0, i1, blk in executor.map(_process_block, boundaries): result[i0:i1, :] = blk return result else: raise ValueError("Only 1D and 2D are supported")
# =========================================================================== # kohn_nirenberg_nonperiodic (Dirichlet / non-periodic) # =========================================================================== # ── module-level phase-matrix cache (1-D only) ─────────────────────────────── _KN_CACHE: dict = {} def _cache_key_1d(x: np.ndarray, xi: np.ndarray) -> tuple: """ Stable cache key that invalidates automatically when the grid changes. Uses shape + endpoint values (not object id, which can be reused by Python). """ return ( x.shape, float(x[0]), float(x[-1]), xi.shape, float(xi[0]), float(xi[-1]), )
[docs] def kohn_nirenberg_nonperiodic( u_vals, x_grid, xi_grid, symbol_func, freq_window: str = 'gaussian', clamp: float = 1e6, space_window: bool = False, _cache: dict = _KN_CACHE, ): """ Apply a pseudo-differential operator using Kohn–Nirenberg quantization on a non‑periodic domain. The operator is defined by (Op(p) u)(x) = (1/(2π)) ∫ p(x, ξ) û(ξ) e^{i x ξ} dξ, where û(ξ) is the non‑periodic Fourier transform of u(x) on the given grids. For 1D inputs the function caches several objects that depend only on the spatial and frequency grids: the forward Fourier kernel, the inverse kernel, and pre‑computed frequency windows and spatial taper. These are reused automatically on subsequent calls with the same grids, reducing setup overhead. The cache can be cleared with `invalidate_kn_cache()`. For 2D inputs the computation is performed block‑wise along the first spatial dimension to control memory usage, and parallelised with a thread pool. Parameters ---------- u_vals : ndarray Function values at the spatial grid points. One‑dimensional array for 1D, two‑dimensional array for 2D (first dimension corresponds to x₁, second to x₂). x_grid : ndarray or tuple of ndarray Spatial grid(s). For 1D: a 1D array of coordinates. For 2D: a tuple (x1_grid, x2_grid) of two 1D arrays. xi_grid : ndarray or tuple of ndarray Frequency grid(s). For 1D: a 1D array. For 2D: a tuple (xi1_grid, xi2_grid). symbol_func : callable The symbol p(x, ξ) of the operator. * In 1D: called as `symbol_func(x, xi)` where `x` and `xi` are broadcastable arrays (e.g., `x[:, None]` and `xi[None, :]`). The function should support broadcasting to produce an array of shape `(len(x), len(xi))`. * In 2D: called as `symbol_func(X1, X2, XI1, XI2)` where the arguments are broadcastable arrays; the output shape should be compatible with `(Nx1, Nx2, Nxi1, Nxi2)`. Must return an array of complex numbers (or castable to complex128). freq_window : {'gaussian', 'hann'}, default='gaussian' Frequency‑space window applied to the symbol before the inverse transform. * 'gaussian' : exp(‑(ξ/σ_w)⁴) with σ_w = 0.8·max(|ξ|). * 'hann' : Hanning window over the full frequency range. clamp : float, default=1e6 Absolute value beyond which symbol entries are clipped. Helps avoid numerical overflow. space_window : bool, default=False If True, multiply the symbol by a Gaussian spatial taper centred in the middle of the spatial domain, with width equal to half the domain length. _cache : dict, optional Internal cache dictionary. Do not use directly; exposed only for testing and debugging. Returns ------- ndarray Result of applying the pseudo‑differential operator to `u_vals`. Same shape as `u_vals`. Notes ----- **Caching (1D only)** The cache stores for a given grid pair `(x, xi)`: * `phase_ft` : exp(‑i ξ x) – shape (Nxi, Nx) * `exp_matrix` : exp( i x ξ) – shape (Nx, Nxi) * `window_gauss` : 1D Gaussian window – shape (Nxi,) * `window_hann` : 1D Hann window – shape (Nxi,) * `spatial_taper`: 1D Gaussian taper – shape (Nx,) (if `space_window` ever used) The cache key is derived from the first and last points of both grids, so the cache is automatically invalidated when either grid changes in shape or endpoints. A warning is issued the first time a new grid pair is encountered. **2D implementation** To avoid forming a full 4D array, the computation is split into blocks along the first spatial dimension (x₁). Each block is processed independently by a worker thread. The number of workers is taken from `solver.FFT_WORKERS`. Memory usage scales as O(block_size × Nx₂ × Nxi₁ × Nxi₂). Examples -------- 1D example: apply the derivative operator i·ξ. >>> import numpy as np >>> from kn_operator import kohn_nirenberg_nonperiodic, invalidate_kn_cache >>> x = np.linspace(-10, 10, 200) >>> xi = np.fft.fftshift(np.fft.fftfreq(len(x), d=x[1]-x[0])) * 2*np.pi >>> u = np.exp(-x**2) >>> def symbol(x, xi): return 1j * xi # broadcasting works >>> du = kohn_nirenberg_nonperiodic(u, x, xi, symbol) >>> # Compare with analytical derivative: -2*x * exp(-x**2) """ # ========================================================================= # 1-D — with phase‑matrix cache and precomputed windows # ========================================================================= if u_vals.ndim == 1: x = np.asarray(x_grid) xi = np.asarray(xi_grid) dx = x[1] - x[0] dxi = xi[1] - xi[0] # ── Look up or build the cached objects ───────────────────────────── key = _cache_key_1d(x, xi) if key not in _cache: # Core Fourier matrices phase_ft = np.exp(-1j * np.outer(xi, x)) # (Nxi, Nx) exp_matrix = np.exp(1j * np.outer(x, xi)) # (Nx, Nxi) # Pre‑compute frequency windows (1D arrays) xi_abs_max = np.max(np.abs(xi)) sigma_w = 0.8 * xi_abs_max window_gauss = np.exp(-(xi / sigma_w) ** 4) # (Nxi,) window_hann = np.zeros_like(xi) mask = np.abs(xi) < xi_abs_max window_hann[mask] = 0.5 * (1.0 + np.cos(np.pi * xi[mask] / xi_abs_max)) # Spatial Gaussian taper (always computed; it is small) x_center = (x[0] + x[-1]) / 2.0 L_half = (x[-1] - x[0]) / 2.0 spatial_taper = np.exp(-((x - x_center) / L_half) ** 2) # (Nx,) _cache[key] = dict( phase_ft=phase_ft, exp_matrix=exp_matrix, window_gauss=window_gauss, window_hann=window_hann, spatial_taper=spatial_taper, ) warnings.warn( f"kohn_nirenberg_nonperiodic: building 1D cache " f"(Nx={len(x)}, Nxi={len(xi)}). " "This message appears only on the first call for this grid.", stacklevel=2, ) entry = _cache[key] phase_ft = entry['phase_ft'] # (Nxi, Nx) exp_matrix = entry['exp_matrix'] # (Nx, Nxi) window_gauss = entry['window_gauss'] # (Nxi,) window_hann = entry['window_hann'] # (Nxi,) spatial_taper = entry['spatial_taper'] # (Nx,) # ── Forward non‑periodic Fourier transform ───────────────────────── u_hat = dx * (phase_ft @ u_vals) # (Nxi,) # ── Symbol evaluation (may return shape (Nx,1), (1,Nxi) or (Nx,Nxi)) sigma_raw = symbol_func(x[:, None], xi[None, :]).astype(np.complex128, copy=False) # ── Clipping (in‑place, safe for any shape) ───────────────────────── np.clip(sigma_raw, -clamp, clamp, out=sigma_raw) # ── Frequency window (broadcast multiplication, result always (Nx,Nxi)) if freq_window == 'gaussian': sigma = sigma_raw * window_gauss[None, :] # broadcasts to (Nx,Nxi) elif freq_window == 'hann': sigma = sigma_raw * window_hann[None, :] else: sigma = sigma_raw # ── Optional spatial window (also broadcasts) ─────────────────────── if space_window: sigma = sigma * spatial_taper[:, None] # (Nx,Nxi) # ── Inverse quadrature as a single matrix‑vector product ─────────── weighted_exp = sigma * exp_matrix # (Nx, Nxi) result = (dxi / (2.0 * np.pi)) * (weighted_exp @ u_hat) # (Nx,) return result # ========================================================================= # 2-D — block‑parallel, memory‑efficient # ========================================================================= elif u_vals.ndim == 2: from solver import FFT_WORKERS from concurrent.futures import ThreadPoolExecutor x1, x2 = x_grid xi1, xi2 = xi_grid dx1 = x1[1] - x1[0] dx2 = x2[1] - x2[0] dxi1 = xi1[1] - xi1[0] dxi2 = xi2[1] - xi2[0] Nx1, Nx2 = len(x1), len(x2) Nxi1, Nxi2 = len(xi1), len(xi2) # ── Global non‑periodic FT of u ───────────────────────────────────── phase1 = np.exp(-1j * np.outer(xi1, x1)) # (Nxi1, Nx1) phase2 = np.exp(-1j * np.outer(x2, xi2)) # (Nx2, Nxi2) u_hat = dx1 * dx2 * (phase1 @ u_vals @ phase2) # (Nxi1, Nxi2) iph2 = np.exp(1j * np.outer(x2, xi2)) # (Nx2, Nxi2) if space_window: y_center = (x2[0] + x2[-1]) / 2.0 Ly = (x2[-1] - x2[0]) / 2.0 sw_x2 = np.exp(-((x2 - y_center) / Ly) ** 2) else: sw_x2 = None n_workers = FFT_WORKERS base = max(1, Nx1 // n_workers) boundaries = [ (i * base, min((i + 1) * base, Nx1)) for i in range(n_workers) if i * base < Nx1 ] result = np.zeros((Nx1, Nx2), dtype=np.complex128) def _process_block(bounds): i0, i1 = bounds x1_blk = x1[i0:i1] B = i1 - i0 X1b = x1_blk[:, None, None, None] X2b = x2[None, :, None, None] XI1b = xi1[None, None, :, None] XI2b = xi2[None, None, None, :] sv = symbol_func(X1b, X2b, XI1b, XI2b).astype(np.complex128) sv = np.clip(sv, -clamp, clamp) if freq_window == 'gaussian': s1 = 0.8 * np.max(np.abs(XI1b)) s2 = 0.8 * np.max(np.abs(XI2b)) sv *= np.exp(-(XI1b / s1) ** 4) * np.exp(-(XI2b / s2) ** 4) elif freq_window == 'hann': Wx = 0.5 * (1 + np.cos(np.pi * XI1b / np.max(np.abs(xi1)))) Wy = 0.5 * (1 + np.cos(np.pi * XI2b / np.max(np.abs(xi2)))) sv *= (Wx * Wy * (np.abs(XI1b) < np.max(np.abs(xi1))) * (np.abs(XI2b) < np.max(np.abs(xi2)))) if space_window: x_center = (x1[0] + x1[-1]) / 2.0 Lx = (x1[-1] - x1[0]) / 2.0 sv *= np.exp(-((x1_blk - x_center) / Lx) ** 2)[:, None, None, None] if sw_x2 is not None: sv *= sw_x2[None, :, None, None] iph1 = np.exp(1j * np.outer(x1_blk, xi1)).reshape(B, 1, Nxi1, 1) iph2_b = iph2[None, :, None, :] phase = iph1 * iph2_b # (B, Nx2, Nxi1, Nxi2) integrand = sv * u_hat[None, None, :, :] * phase blk_res = (dxi1 * dxi2 / (2.0 * np.pi) ** 2) * np.sum( integrand, axis=(2, 3) ) return i0, i1, blk_res with ThreadPoolExecutor(max_workers=n_workers) as executor: for i0, i1, blk in executor.map(_process_block, boundaries): result[i0:i1, :] = blk return result else: raise NotImplementedError("Only 1D and 2D supported")
[docs] def invalidate_kn_cache(): """ Clear the phase-matrix cache. Call this after any solver.setup() call that changes Lx, Nx, or the frequency grid, so the next apply re-builds the matrices for the new grid. """ _KN_CACHE.clear()