from contextlib import contextmanager
from itertools import chain
from typing import Optional, Union
from astropy.units.quantity import Quantity
from astropy.modeling import Model, CompoundModel
import numpy as np
import portion.interval
from scipy.integrate import quad_vec
def _map_interval(func):
"""Apply a function to the bounds of an interval."""
def wrapper(interval):
lower = func(interval.lower)
upper = func(interval.upper)
if lower > upper:
lower, upper = upper, lower
return portion.open(lower, upper)
return wrapper
def _to_open_interval(interval: portion.Interval) -> portion.Interval:
"""Convert any interval to an open interval."""
return portion.open(interval.lower, interval.upper)
@contextmanager
def _with_portion_float_inf():
"""Temporarily set infinity used by the portion library to `float('inf')`.
`float('inf')` is comparable with any Astropy quantity, but `portion.inf`
is not. The portion uses its representation of infinity internally in order
to calculate complement sets.
"""
tmp = portion.interval.inf
portion.interval.inf = float('inf')
try:
yield
finally:
portion.interval.inf = tmp
def _get_1d_units_for_dict(units: dict) -> Union[Quantity, float]:
if units:
(_, units), = units.items()
if not units:
units = 1
return units
@_with_portion_float_inf()
def _get_intervals(model: Model) -> portion.Interval:
"""Calculate the model breakpoints for a 1D Model by combining
the bounding box endpoints of the submodels
Parameters
----------
model : :class:`astropy.modeling.Model`
The model.
Returns
-------
points : list, float
Model breakpoints to facilitate model integration, sorted from low
to high. The range for integration will be from the first to the last
entry in the list.
Notes
-----
Models without bounding boxes are valid in full domain, so breakpoints are
given as +/- `numpy.inf`. However, infinities will be discarded if
at least one submodel has a bounding box.
Examples
--------
>>> from astropy.modeling import models
>>> from astropy import units as u
>>> from m4opt.models.math import _get_intervals
For a simple, bounded model, `_get_intervals` returns a single interval:
>>> _get_intervals(models.Box1D(width=3))
(-1.5,1.5)
This also works with models that have dimensionful input:
>>> _get_intervals(models.Box1D(width=3*u.m))
(<Quantity -1.5 m>,<Quantity 1.5 m>)
For unbounded models, `_get_intervals` returns an infinite interval:
>>> _get_intervals(models.Exponential1D())
(-inf,inf)
>>> _get_intervals(models.Exponential1D(tau=1*u.m))
(<Quantity -inf m>,<Quantity inf m>)
Compound models are supported too. Supported operations include function
composition (`|`):
>>> _get_intervals(models.Box1D(width=3) | models.Shift(offset=2))
(0.5,3.5)
>>> _get_intervals(models.Box1D(width=3*u.m) | models.Shift(offset=2*u.m))
(<Quantity 0.5 m>,<Quantity 3.5 m>)
Multiplication (`*`) results in an intersection:
>>> model1 = models.Box1D(x_0=-0.5, width=3)
>>> model2 = models.Box1D(x_0=0.5, width=3)
>>> _get_intervals(model1)
(-2.0,1.0)
>>> _get_intervals(model2)
(-1.0,2.0)
>>> _get_intervals(model1 * model2)
(-1.0,1.0)
>>> model1 = models.Box1D(x_0=-0.5*u.m, width=3*u.m)
>>> model2 = models.Box1D(x_0=0.5*u.m, width=3*u.m)
>>> _get_intervals(model1 * model2)
(<Quantity -1. m>,<Quantity 1. m>)
Addition (`+`) and subtraction (`-`) result in a union, but holes are kept.
>>> model1 = models.Box1D(x_0=-0.5, width=3)
>>> model2 = models.Box1D(x_0=0.5, width=3)
>>> _get_intervals(model1 + model2)
(-2.0,-1.0) | (-1.0,2.0)
>>> _get_intervals(model1 - model2)
(-2.0,-1.0) | (-1.0,2.0)
>>> model1 = models.Box1D(x_0=-0.5*u.m, width=3*u.m)
>>> model2 = models.Box1D(x_0=0.5*u.m, width=3*u.m)
>>> _get_intervals(model1 + model2)
(<Quantity -2. m>,<Quantity -1. m>) | (<Quantity -1. m>,<Quantity 2. m>)
>>> _get_intervals(model1 - model2)
(<Quantity -2. m>,<Quantity -1. m>) | (<Quantity -1. m>,<Quantity 2. m>)
"""
if isinstance(model, CompoundModel):
if model.op == '|':
return _get_intervals(model.left).apply(_map_interval(model.right))
elif model.op == '*':
return _get_intervals(model.left) & _get_intervals(model.right)
elif model.op in {'+', '-', '/'}:
lhs = portion.IntervalDict({_get_intervals(model.left): 'lhs'})
rhs = portion.IntervalDict({_get_intervals(model.right): 'rhs'})
union = lhs | rhs
return portion.Interval(
*(intervals.apply(_to_open_interval) for intervals in union))
else:
raise NotImplementedError(f'operation {model.op} not supported')
else:
try:
bbox = model.bounding_box
except NotImplementedError:
unit = _get_1d_units_for_dict(model.input_units)
return portion.open(-np.inf * unit, np.inf * unit)
else:
(lo, hi), = bbox
return portion.open(lo, hi)
[docs]
def integrate(model: Model, *,
quick_and_dirty_npts: Optional[int] = None,
**kwargs) -> Union[Quantity, float]:
"""Integrate a 1D model using adaptive trapezoidal quadrature.
Parameters
----------
model : :class:`astropy.modeling.Model`
The model to integrate.
quick_and_dirty_npts : int, optional
Number of sample points. If provided, disable adaptive quadrature and
instead use fixed-order quadrature with the specified number of
regularly spaced sample points.
**kwargs : dict
Additional keyword arguments passed to
:meth:`scipy.integrate.quad_vec`.
Returns
-------
integral : float, :class:`astropy.units.Quantity`
The integral over the bounding box of the model's inputs.
Notes
-----
When integrating compound models, the domain of integration is subdivided
using the bounding box endpoints of all of the submodels. The integrator
minimizes the error by tracking the error contributions from all of the
subdivisions. The integrator can be slow; passing lower tolerances can
speed up the calculation with a loss of accuracy. For a faster version
without convergence checking, try the `quick_and_dirty_npts` option.
Examples
--------
You can integrate dimensionless models:
>>> from astropy.modeling import models
>>> from astropy import units as u
>>> from m4opt.models import integrate
>>> import numpy as np
>>> model = models.Box1D(width=3)
>>> integrate(model)
3.0
>>> integrate(model, quick_and_dirty_npts=10000)
3.0
>>> model = models.Gaussian1D() * models.Const1D(1 / np.sqrt(2 * np.pi))
>>> integrate(model, epsrel=1e-7)
0.9999999648585338
>>> integrate(model, quick_and_dirty_npts=10000)
0.9999999620207557
Or models with dimensions:
>>> model = models.Lorentz1D(x_0=1 * u.micron, fwhm=0.1 * u.micron,
... amplitude=1 * u.erg / u.micron)
>>> integrate(model, epsrel=1e-7)
<Quantity 0.1550799 erg>
>>> integrate(model, quick_and_dirty_npts=10000)
<Quantity 0.1550799 erg>
"""
x_unit = _get_1d_units_for_dict(model.input_units)
intervals = _get_intervals(model)
a, *points, b = np.unique(Quantity(list(chain.from_iterable(
(i.lower, i.upper) for i in intervals)), x_unit).value.ravel())
# FIXME: Cannot rely on model.return_units because it is not set for all
# models (e.g. Lorentz1D).
y_unit = getattr(model(a * x_unit), 'unit', None) or 1
def func(x):
result = model(x * x_unit)
if hasattr(result, 'to_value'):
result = result.to_value()
return result
if quick_and_dirty_npts is not None:
x = np.linspace(a, b, quick_and_dirty_npts)
yint = np.trapz(func(x), x)
else:
yint, _ = quad_vec(func, a, b, points=points,
quadrature='trapezoid', **kwargs)
return yint * x_unit * y_unit