Source code for sunkit_spex.models.physical.thermal

import copy
import warnings

import numpy as np
from scipy import interpolate, stats

import astropy.units as u
from astropy.modeling import FittableModel, Parameter
from astropy.table.column import Column

from sunpy.data import manager

from sunkit_spex.models.physical.io import (
    load_chianti_continuum,
    load_chianti_lines_lite,
    load_xray_abundances,
)

# The default elemental abundance values correspond to coronal values
DEFAULT_ABUNDANCE_TYPE = "sun_coronal_ext"

__all__ = ["ContinuumEmission", "LineEmission", "ThermalEmission"]

doc_string_params = """
Parameters
----------
energy_edges: `astropy.units.Quantity`
    The edges of the energy bins in a 1D N+1 quantity.

temperature: `astropy.units.Quantity`
    The temperature of the plasma.
    Can be scalar or 1D of any length. If not scalar, the flux for each temperature
    will be calculated. The first dimension of the output flux will correspond
    to temperature.

emission_measure: `astropy.units.Quantity`
    The emission measure of the plasma at each temperature.
    Must be same length as temperature or scalar. This is passed in units of cm**-3, however is scaled and therefore is
    in units of 1e49cm**-3.

abundance_type: `str` (optional)
    Abundance type to use.  Options are:
        1. cosmic
        2. sun_coronal - default abundance
        3. sun_coronal_ext
        4. sun_hybrid
        5. sun_hybrid_ext
        6. sun_photospheric
        7. mewe_cosmic
        8. mewe_solar
    The values for each abundance type is stored in the global
    variable DEFAULT_ABUNDANCES which is generated by `setup_default_abundances`
    function. To load different default values for each abundance type,
    see the docstring of that function.


    mg: `float`
    Abundance of MG.

    al = `float`
    Abundance of Al.

    si =  `float`
    Abundance of Si.

    s =  `float`
    Abundance of S.

    ar =  `float`
    Abundance of Ar.

    ca =  `float`
    Abundance of Ca.

    fe =  `float`
    Abundance of Fe.

Returns
-------
flux: `astropy.units.Quantity`
    The photon flux as a function of temperature and energy.
"""

doc_string_notes = """
Notes
----------

The atomic abundances are set by the abundance_type, however they can also be fit as free parameters or specified
by the user to custom values.

"""


[docs] class ThermalEmission(FittableModel): f"""Calculate the thermal X-ray spectrum (lines + continuum) from the solar atmosphere. The flux is calculated as a function of temperature and emission measure. Which continuum mechanisms are included --- free-free, free-bound, or two-photon --- are determined by the file from which the continuum parameters are loaded. To change the file used, see the setup_continuum_parameters() function. Example ======== .. plot:: :include-source: import astropy.units as u import numpy as np import matplotlib.pyplot as plt from astropy.visualization import quantity_support from sunkit_spex.models.scaling import ThermalEmission ph_energies = np.arange(4, 100, 0.5)*u.keV ph_energies_centers = ph_energies[:-1] + 0.5*np.diff(ph_energies) source = ThermalEmission()(ph_energies) with quantity_support(): plt.figure() plt.plot(ph_energies_centers , source) plt.loglog() plt.legend() plt.show() {doc_string_params}""" name = "ThermalEmission" n_inputs = 1 n_outputs = 1 temperature = Parameter( name="temperature", default=10, min=1, max=100, unit=u.MK, description="Temperature of the plasma", fixed=False, ) emission_measure = Parameter( name="emission_measure", default=1, unit=(u.cm ** (-3)), description="Emission measure of the observer", fixed=False, ) mg = Parameter(name="Mg", default=8.15, min=6.15, max=10.15, description="Mg relative abundance", fixed=True) al = Parameter(name="Al", default=7.04, min=5.04, max=9.04, description="Al relative abundance", fixed=True) si = Parameter(name="Si", default=8.1, min=6.1, max=10.1, description="Si relative abundance", fixed=True) s = Parameter(name="S", default=7.27, min=5.27, max=9.27, description="S relative abundance", fixed=True) ar = Parameter(name="Ar", default=6.58, min=4.58, max=8.58, description="Ar relative abundance", fixed=True) ca = Parameter(name="Ca", default=6.93, min=4.93, max=8.93, description="Ca relative abundance", fixed=True) fe = Parameter(name="Fe", default=8.1, min=6.1, max=10.1, description="Fe relative abundance", fixed=True) # input_units_equivalencies = {"keV": u.spectral(), "K": u.temperature_energy()} _input_units_allow_dimensionless = True def __init__( self, temperature=u.Quantity(temperature.default, temperature.unit), emission_measure=u.Quantity(emission_measure.default, emission_measure.unit), mg=mg.default, al=al.default, si=si.default, s=s.default, ar=ar.default, ca=ca.default, fe=fe.default, abundance_type=DEFAULT_ABUNDANCE_TYPE, **kwargs, ): self.abundance_type = abundance_type if abundance_type != DEFAULT_ABUNDANCE_TYPE: mg, al, si, s, ar, ca, fe = _initialize_abundances(DEFAULT_ABUNDANCES[abundance_type]) self.line = LineEmission( temperature=temperature, emission_measure=emission_measure, mg=mg, al=al, si=si, s=s, ar=ar, ca=ca, fe=fe, abundance_type=abundance_type, ) self.cont = ContinuumEmission( temperature=temperature, emission_measure=emission_measure, mg=mg, al=al, si=si, s=s, ar=ar, ca=ca, fe=fe, abundance_type=abundance_type, ) super().__init__( temperature=temperature, emission_measure=emission_measure, mg=mg, al=al, si=si, s=s, ar=ar, ca=ca, fe=fe, **kwargs, )
[docs] def evaluate( self, energy_edges, temperature, emission_measure, mg, al, si, s, ar, ca, fe, ): line_flux = self.line.evaluate( energy_edges, temperature, emission_measure, mg, al, si, s, ar, ca, fe, ) cont_flux = self.cont.evaluate( energy_edges, temperature, emission_measure, mg, al, si, s, ar, ca, fe, ) return line_flux + cont_flux
@property def input_units(self): # The units for the 'energy_edges' variable should be an energy (default keV) return {self.inputs[0]: u.keV} @property def return_units(self): return {self.outputs[0]: u.ph / u.keV * u.s**-1} def _parameter_units_for_data_units(self, inputs_unit, outputs_unit): return {"temperature": u.MK, "emission_measure": (u.cm ** (-3))}
[docs] class ContinuumEmission(FittableModel): f"""Calculate the thermal X-ray continuum emission from the solar atmosphere. The emission is calculated as a function of temperature and emission measure. Which continuum mechanisms are included --- free-free, free-bound, or two-photon --- are determined by the file from which the comtinuum parameters are loaded. To change the file used, see the setup_continuum_parameters() function. Example ======== .. plot:: :include-source: import astropy.units as u import numpy as np import matplotlib.pyplot as plt from astropy.visualization import quantity_support from sunkit_spex.models.scaling import ThermalEmission ph_energies = np.arange(4, 100, 0.5)*u.keV ph_energies_centers = ph_energies[:-1] + 0.5*np.diff(ph_energies) source = ContinuumEmission()(ph_energies) with quantity_support(): plt.figure() plt.plot(ph_energies_centers , source) plt.loglog() plt.legend() plt.show() {doc_string_params}""" n_inputs = 1 n_outputs = 1 temperature = Parameter( name="temperature", default=10, min=1, max=100, unit=u.MK, description="Temperature of the plasma", fixed=False, ) emission_measure = Parameter( name="emission_measure", default=1, unit=(u.cm ** (-3)), description="Emission measure of the observer", fixed=False, ) mg = Parameter(name="Mg", default=8.15, min=6.15, max=10.15, description="Mg relative abundance", fixed=True) al = Parameter(name="Al", default=7.04, min=5.04, max=9.04, description="Al relative abundance", fixed=True) si = Parameter(name="Si", default=8.1, min=6.1, max=10.1, description="Si relative abundance", fixed=True) s = Parameter(name="S", default=7.27, min=5.27, max=9.27, description="S relative abundance", fixed=True) ar = Parameter(name="Ar", default=6.58, min=4.58, max=8.58, description="Ar relative abundance", fixed=True) ca = Parameter(name="Ca", default=6.93, min=4.93, max=8.93, description="Ca relative abundance", fixed=True) fe = Parameter(name="Fe", default=8.1, min=6.1, max=10.1, description="Fe relative abundance", fixed=True) _input_units_allow_dimensionless = True def __init__( self, temperature=u.Quantity(temperature.default, temperature.unit), emission_measure=u.Quantity(emission_measure.default, emission_measure.unit), mg=mg.default, al=al.default, si=si.default, s=s.default, ar=ar.default, ca=ca.default, fe=fe.default, abundance_type=DEFAULT_ABUNDANCE_TYPE, **kwargs, ): self.abundance_type = abundance_type if abundance_type != DEFAULT_ABUNDANCE_TYPE: mg, al, si, s, ar, ca, fe = _initialize_abundances(DEFAULT_ABUNDANCES[abundance_type]) super().__init__( temperature=temperature, emission_measure=emission_measure, mg=mg, al=al, si=si, s=s, ar=ar, ca=ca, fe=fe, **kwargs, )
[docs] def evaluate( self, energy_edges, temperature, emission_measure, mg, al, si, s, ar, ca, fe, ): return continuum_emission( energy_edges, temperature, emission_measure, mg, al, si, s, ar, ca, fe, self.abundance_type, )
@property def input_units(self): # The units for the 'energy_edges' variable should be an energy (default keV) return {self.inputs[0]: u.keV} @property def return_units(self): return {self.outputs[0]: u.ph / u.keV * u.s**-1} def _parameter_units_for_data_units(self, inputs_unit, outputs_unit): return {"temperature": u.MK, "emission_measure": (u.cm ** (-3))}
[docs] class LineEmission(FittableModel): f""" Calculate thermal line emission from the solar corona. Example ======== .. plot:: :include-source: import astropy.units as u import numpy as np import matplotlib.pyplot as plt from astropy.visualization import quantity_support from sunkit_spex.models.scaling import ThermalEmission ph_energies = np.arange(4, 100, 0.5)*u.keV ph_energies_centers = ph_energies[:-1] + 0.5*np.diff(ph_energies) source = LineEmission()(ph_energies) with quantity_support(): plt.figure() plt.plot(ph_energies_centers , source) plt.loglog() plt.legend() plt.show() {doc_string_params}""" n_inputs = 1 n_outputs = 1 temperature = Parameter( name="temperature", default=10, min=1, max=100, unit=u.MK, description="Temperature of the plasma", fixed=False, ) emission_measure = Parameter( name="emission_measure", default=1e50, unit=(u.cm ** (-3)), description="Emission measure of the observer", fixed=False, ) mg = Parameter(name="Mg", default=8.15, min=6.15, max=10.15, description="Mg relative abundance", fixed=True) al = Parameter(name="Al", default=7.04, min=5.04, max=9.04, description="Al relative abundance", fixed=True) si = Parameter(name="Si", default=8.1, min=6.1, max=10.1, description="Si relative abundance", fixed=True) s = Parameter(name="S", default=7.27, min=5.27, max=9.27, description="S relative abundance", fixed=True) ar = Parameter(name="Ar", default=6.58, min=4.58, max=8.58, description="Ar relative abundance", fixed=True) ca = Parameter(name="Ca", default=6.93, min=4.93, max=8.93, description="Ca relative abundance", fixed=True) fe = Parameter(name="Fe", default=8.1, min=6.1, max=10.1, description="Fe relative abundance", fixed=True) _input_units_allow_dimensionless = True def __init__( self, temperature=u.Quantity(temperature.default, temperature.unit), emission_measure=u.Quantity(emission_measure.default, emission_measure.unit), mg=mg.default, al=al.default, si=si.default, s=s.default, ar=ar.default, ca=ca.default, fe=fe.default, abundance_type=DEFAULT_ABUNDANCE_TYPE, **kwargs, ): self.abundance_type = abundance_type if abundance_type != DEFAULT_ABUNDANCE_TYPE: mg, al, si, s, ar, ca, fe = _initialize_abundances(DEFAULT_ABUNDANCES[abundance_type]) super().__init__( temperature=temperature, emission_measure=emission_measure, mg=mg, al=al, si=si, s=s, ar=ar, ca=ca, fe=fe, **kwargs, )
[docs] def evaluate( self, energy_edges, temperature, emission_measure, mg, al, si, s, ar, ca, fe, ): return line_emission( energy_edges, temperature, emission_measure, mg, al, si, s, ar, ca, fe, self.abundance_type, )
@property def input_units(self): # The units for the 'energy_edges' variable should be an energy (default keV) return {self.inputs[0]: u.keV} @property def return_units(self): return {self.outputs[0]: u.ph / u.keV * u.s**-1} def _parameter_units_for_data_units(self, inputs_unit, outputs_unit): return {"temperature": u.MK, "emission_measure": (u.cm ** (-3))}
def setup_continuum_parameters(filename=None): """ Define continuum intensities as a function of temperature. Intensities are set as global variables and used in calculation of spectra by other functions in this module. They are in units of per volume emission measure at source, i.e. they must be divided by 4 * pi R**2 to be converted to physical values where R**2 is observer distance. Intensities are derived from output from the CHIANTI atomic physics database. The default CHIANTI data used here are collected from `https://hesperia.gsfc.nasa.gov/ssw/packages/xray/dbase/chianti/chianti_cont_1_250_v71.sav`. This includes contributions from thermal bremsstrahlung and two-photon interactions. To use a different file, provide the URL/file location via the filename kwarg, e.g. to include only thermal bremsstrahlung, set the filename kwarg to 'https://hesperia.gsfc.nasa.gov/ssw/packages/xray/dbase/chianti/chianti_cont_1_250_v70_no2photon.sav' Parameters ---------- filename: `str` (optional) URL or file location of the CHIANTI IDL save file to be used. """ if filename: with manager.override_file("chianti_continuum", uri=filename): cont_info = load_chianti_continuum() else: cont_info = load_chianti_continuum() continuum_grid = {} continuum_grid["abundance index"] = cont_info.element_index.data continuum_grid["sorted abundance index"] = np.sort(continuum_grid["abundance index"]) T_grid = (cont_info.temperature.data * cont_info.attrs["units"]["temperature"]).to(u.K) continuum_grid["log10T"] = np.log10(T_grid.value) continuum_grid["T_keV"] = T_grid.to_value(u.keV, equivalencies=u.temperature_energy()) wavelength = cont_info.wavelength.data * cont_info.attrs["units"]["wavelength"] dwave_AA = (cont_info.attrs["wavelength_edges"][1:] - cont_info.attrs["wavelength_edges"][:-1]).to_value(u.AA) continuum_grid["E_keV"] = wavelength.to_value(u.keV, equivalencies=u.spectral()) continuum_grid["energy bin widths keV"] = continuum_grid["E_keV"] * dwave_AA / wavelength.to_value(u.AA) continuum_grid["intensity"] = cont_info.data continuum_grid["intensity unit"] = cont_info.attrs["units"]["data"] continuum_grid["intensity description"] = ( "Intensity is stored as photons per keV per unit emission measure at the source. " "It (and its unit) therefore must be multiplied by emission measure and " "divided by 4 * pi * observer_distance**2 to get observed values." ) continuum_grid["energy range keV"] = (continuum_grid["E_keV"].min(), continuum_grid["E_keV"].max()) continuum_grid["temperature range K"] = (T_grid.value.min(), T_grid.value.max()) return continuum_grid def setup_line_parameters(filename=None): """Define line intensities as a function of temperature for calculating line emission. Line intensities are set as global variables and used in the calculation of spectra by other functions in this module. They are in units of per unit emission measure at source, i.e. they must be divided by 4 pi R**2 (where R is the observer distance) and multiplied by emission measure to be converted to physical values at the observer. Line intensities are derived from output from the CHIANTI atomic physics database. The default CHIANTI data used here is collected from `https://hesperia.gsfc.nasa.gov/ssw/packages/xray/dbase/chianti/chianti_lines_1_10_v71.sav`. To use a different file, provide the URL/file location via the filename kwarg. Parameters ---------- filename: `str` (optional) URL or file location of the CHIANTI IDL save file to be used. """ if filename: with manager.override_file("chianti_lines", uri=filename): line_info = load_chianti_lines_lite() else: line_info = load_chianti_lines_lite() line_grid = {} line_grid["intensity"] = np.array(line_info.data) line_grid["intensity unit"] = line_info.attrs["units"]["data"] line_grid["intensity description"] = ( "Intensity is stored as photons per unit emission measure at the source. " "It (and its unit) therefore must be multiplied by emission measure and " "divided by 4 * pi * observer_distance**2 to get observed values." ) line_grid["line peaks keV"] = (line_info.peak_energy.data << line_info.attrs["units"]["peak_energy"]).to_value( u.keV, equivalencies=u.spectral() ) line_grid["log10T"] = line_info.logT.data line_grid["abundance index"] = line_info.attrs["element_index"] line_grid["line atomic numbers"] = line_info.atomic_number.data line_grid["energy range keV"] = (line_grid["line peaks keV"].min(), line_grid["line peaks keV"].max()) T_grid = 10 ** line_grid["log10T"] line_grid["temperature range K"] = (T_grid.min(), T_grid.max()) return line_grid def setup_default_abundances(filename=None): """ Read default abundance values into global variable. By default, data is read from the following file: https://hesperia.gsfc.nasa.gov/ssw/packages/xray/dbase/chianti/xray_abun_file.genx To load data from a different file, see Notes section. Parameters ---------- filename: `str` (optional) URL or file location of the .genx abundance file to be used. """ if filename: with manager.override_file("xray_abundance", uri=filename): return load_xray_abundances() else: return load_xray_abundances() # Read line, continuum and abundance data into global variables. CONTINUUM_GRID = setup_continuum_parameters() LINE_GRID = setup_line_parameters() DEFAULT_ABUNDANCES = setup_default_abundances() @u.quantity_input def continuum_emission( energy_edges, temperature, emission_measure, mg, al, si, s, ar, ca, fe, abundance_type=DEFAULT_ABUNDANCE_TYPE, ): f"""Calculate the thermal X-ray continuum emission from the solar atmosphere. The emission is calculated as a function of temperature and emission measure. Which continuum mechanisms are included --- free-free, free-bound, or two-photon --- are determined by the file from which the comtinuum parameters are loaded. To change the file used, see the setup_continuum_parameters() function. {doc_string_params}""" # Sanitize inputs energy_edges_keV, temperature_K, emission_measure = _sanitize_inputs(energy_edges, temperature, emission_measure) _error_if_low_energy_input_outside_valid_range( energy_edges_keV.value, CONTINUUM_GRID["energy range keV"], "energy", "keV" ) _warn_if_input_outside_valid_range(energy_edges_keV.value, CONTINUUM_GRID["energy range keV"], "energy", "keV") _error_if_input_outside_valid_range(temperature_K.value, CONTINUUM_GRID["temperature range K"], "temperature", "K") # # Calculate abundances abundances = _calculate_abundances(abundance_type, mg, al, si, s, ar, ca, fe) # Calculate flux. flux = _continuum_emission(energy_edges_keV, temperature_K, abundances) flux *= emission_measure * 1e49 if temperature_K.isscalar and emission_measure.isscalar: flux = flux[0] elif len(temperature_K) == 1 and len(emission_measure) == 1: flux = flux[0] return flux @u.quantity_input def line_emission( energy_edges, temperature, emission_measure, mg, al, si, s, ar, ca, fe, abundance_type=DEFAULT_ABUNDANCE_TYPE, ): f""" Calculate thermal line emission from the solar corona. {doc_string_params}""" # Sanitize inputs energy_edges_keV, temperature_K, emission_measure = _sanitize_inputs(energy_edges, temperature, emission_measure) # Convert inputs to known units and confirm they are within range. _error_if_low_energy_input_outside_valid_range( energy_edges_keV.value, LINE_GRID["energy range keV"], "energy", "keV" ) _warn_if_input_outside_valid_range(energy_edges_keV.value, LINE_GRID["energy range keV"], "energy", "keV") _error_if_input_outside_valid_range(temperature_K.value, LINE_GRID["temperature range K"], "temperature", "K") # # Calculate abundances abundances = _calculate_abundances(abundance_type, mg, al, si, s, ar, ca, fe) flux = _line_emission(energy_edges_keV, temperature_K, abundances) flux *= emission_measure * 1e49 if temperature_K.isscalar and emission_measure.isscalar: flux = flux[0] elif len(temperature_K) == 1 and len(emission_measure) == 1: flux = flux[0] return flux def _continuum_emission(energy_edges_keV, temperature_K, abundances): """ Calculates emission-measure-normalized X-ray continuum spectrum at the source. Output must be multiplied by emission measure and divided by 4*pi*observer_distance**2 to get physical values. Which continuum mechanisms are included --- free-free, free-bound, or two-photon --- are determined by the file from which the comtinuum parameters are loaded. To change the file used, see the setup_continuum_parameters() function. Parameters ---------- energy_edges_keV: 1-D array-like Boundaries of contiguous spectral bins in units on keV. temperature_K: 1-D array-like The temperature(s) of the plasma in unit of K. Must not be a scalar. abundances: 1-D `numpy.array` of same length a DEFAULT_ABUNDANCES. The abundances for the all the elements. """ # Handle inputs and derive some useful parameters from them temperature_K = temperature_K log10T_in = np.log10(temperature_K.value) T_in_keV = temperature_K.to("keV", equivalencies=u.temperature_energy()) energy_edges_keV = energy_edges_keV.to(u.keV) # Get energy bins centers based on geometric mean. energy_gmean_keV = stats.gmean(np.vstack((energy_edges_keV[:-1], energy_edges_keV[1:]))) # Mask Unwanted Abundances abundance_mask = np.zeros(len(abundances)) abundance_mask[CONTINUUM_GRID["abundance index"]] = 1.0 abundances *= abundance_mask # Calculate Continuum Intensity Summed Over All Elements ##### For Each Temperature as a function of Energy/Wavelength ###### # Before looping over temperatures, let's perform the calculations that are # used over again in the for loop. # 1. If many temperatures are input, convolve intensity grid with abundances for all # temperatures here. If only a few temperatures are input, do this step only # when looping over input temperatures. This minimizes computation. n_tband = 3 n_t_grid = len(CONTINUUM_GRID["log10T"]) n_temperature_K = len(temperature_K) n_thresh = n_temperature_K * n_tband if n_thresh >= n_t_grid: intensity_per_em_at_source_allT = np.zeros(CONTINUUM_GRID["intensity"].shape[1:]) for i in range(0, n_t_grid): intensity_per_em_at_source_allT[i] = np.matmul( abundances[CONTINUUM_GRID["sorted abundance index"]], CONTINUUM_GRID["intensity"][:, i] ) # 2. Add dummy axes to energy and temperature grid arrays for later vectorized operations. repeat_E_grid = CONTINUUM_GRID["E_keV"][np.newaxis, :] repeat_T_grid = CONTINUUM_GRID["T_keV"][:, np.newaxis] dE_grid_keV = CONTINUUM_GRID["energy bin widths keV"][np.newaxis, :] # 3. Identify the indices of the temperature bins containing each input temperature and # the bins above and below them. For each input temperature, these three bins will # act as a temperature band over which we'll interpolate the continuum emission. selt = np.digitize(log10T_in, CONTINUUM_GRID["log10T"]) - 1 tband_idx = selt[:, np.newaxis] + np.arange(n_tband)[np.newaxis, :] # Finally, loop over input temperatures and calculate continuum emission for each. flux = np.zeros((n_temperature_K, len(energy_gmean_keV))) for j, logt in enumerate(log10T_in): # If not already done above, calculate continuum intensity summed over # all elements as a function of energy/wavelength over the temperature band. if n_thresh < n_t_grid: element_intensities_per_em_at_source = CONTINUUM_GRID["intensity"][:, tband_idx[j]] intensity_per_em_at_source = np.zeros(element_intensities_per_em_at_source.shape[1:]) for i in range(0, n_tband): intensity_per_em_at_source[i] = np.matmul( abundances[CONTINUUM_GRID["sorted abundance index"]], element_intensities_per_em_at_source[:, i] ) else: intensity_per_em_at_source = intensity_per_em_at_source_allT[tband_idx[j]] ##### Calculate Continuum Intensity at Input Temperature ###### # Do this by interpolating the normalized temperature component # of the intensity grid to input temperature(s) and then rescaling. # Calculate normalized temperature component of the intensity grid. exponent = repeat_E_grid / repeat_T_grid[tband_idx[j]] exponential = np.exp(np.clip(exponent, None, 80)) gaunt = intensity_per_em_at_source / dE_grid_keV * exponential # Interpolate the normalized temperature component of the intensity grid the the # input temperature. flux[j] = _interpolate_continuum_intensities( gaunt, CONTINUUM_GRID["log10T"][tband_idx[j]], CONTINUUM_GRID["E_keV"], energy_gmean_keV, logt ) # Rescale the interpolated intensity. energy_gmean_keV = energy_gmean_keV * u.keV flux = flux * np.exp(-(energy_gmean_keV[np.newaxis, :] / T_in_keV[:, np.newaxis])) # Put intensity into correct units. return flux * CONTINUUM_GRID["intensity unit"] def _line_emission(energy_edges_keV, temperature_K, abundances): """ Calculates emission-measure-normalized X-ray line spectrum at the source. Output must be multiplied by emission measure and divided by 4*pi*observer_distance**2 to get physical values. Parameters ---------- energy_edges_keV: 1-D array-like Boundaries of contiguous spectral bins in units on keV. temperature_K: 1-D array-like The temperature(s) of the plasma in unit of K. Must not be a scalar. abundances: 1-D `numpy.array` of same length as DEFAULT_ABUNDANCES. The abundances for the all the elements. """ n_energy_bins = len(energy_edges_keV) - 1 energy_edges_keV = energy_edges_keV.value temperature_K = temperature_K.value n_temperatures = len(temperature_K) # Find indices of lines within user input energy range. energy_roi_indices = np.logical_and( LINE_GRID["line peaks keV"] >= energy_edges_keV.min(), LINE_GRID["line peaks keV"] <= energy_edges_keV.max() ) n_energy_roi_indices = energy_roi_indices.sum() # If there are emission lines within the energy range of interest, compile spectrum.# @u.quantity_input( # energy_edges=u.keV, temperature=u.K, emission_measure=(u.cm ** (-3), u.cm ** (-5)), observer_distance=u.cm # ) if n_energy_roi_indices > 0: # Mask Unwanted Abundances abundance_mask = np.zeros(len(abundances)) abundance_mask[LINE_GRID["abundance index"]] = 1.0 abundances *= abundance_mask # Extract only the lines within the energy range of interest. line_abundances = abundances[LINE_GRID["line atomic numbers"][energy_roi_indices] - 2] # Above magic number of of -2 is comprised of: # a -1 to account for the fact that index is atomic number -1, and # another -1 because abundance index is offset from abundance index by 1. ##### Calculate Line Intensities within the Input Energy Range ##### # Calculate abundance-normalized intensity of each line in energy range of # interest as a function of energy and temperature. line_intensity_grid = LINE_GRID["intensity"][energy_roi_indices] line_intensities = _calculate_abundance_normalized_line_intensities( np.log10(temperature_K), line_intensity_grid, LINE_GRID["log10T"] ) # Scale line intensities by abundances to get true line intensities. line_intensities *= line_abundances ##### Weight Line Emission So Peak Energies Maintained Within Input Energy Binning ##### # Split emission of each line between nearest neighboring spectral bins in # proportion such that the line centroids appear at the correct energy # when averaged over neighboring bins. # This has the effect of appearing to double the number of lines as regards # the dimensionality of the line_intensities array. line_peaks_keV = LINE_GRID["line peaks keV"][energy_roi_indices] split_line_intensities, line_spectrum_bins = _weight_emission_bins_to_line_centroid( line_peaks_keV, energy_edges_keV, line_intensities ) #### Calculate Flux ##### # Use binned_statistic to determine which spectral bins contain # components of line emission and sum over those line components # to get the total emission is each spectral bin. flux = stats.binned_statistic( line_spectrum_bins, split_line_intensities, "sum", n_energy_bins, (0, n_energy_bins - 1) ).statistic else: flux = np.zeros((n_temperatures, n_energy_bins)) # Scale flux by observer distance, emission measure and spectral bin width # and put into correct units. energy_bin_widths = energy_edges_keV[1:] - energy_edges_keV[:-1] energy_bin_widths = energy_bin_widths * u.keV return flux * LINE_GRID["intensity unit"] / (energy_bin_widths) def _interpolate_continuum_intensities(data_grid, log10T_grid, energy_grid_keV, energy_keV, log10T): # Determine valid range based on limits of intensity grid's spectral extent # and the normalized temperature component of intensity. n_tband = len(log10T_grid) (vrange,) = np.where(data_grid[0] > 0) for i in range(1, n_tband): (vrange_i,) = np.where(data_grid[i] > 0) if len(vrange) < len(vrange_i): vrange = vrange_i data_grid = data_grid[:, vrange] energy_grid_keV = energy_grid_keV[vrange] (energy_idx,) = np.where(energy_keV < energy_grid_keV.max()) # Interpolate temperature component of intensity and derive continuum intensity. flux = np.zeros(energy_keV.shape) if len(energy_idx) > 0: energy_keV = energy_keV[energy_idx] cont0 = interpolate.interp1d(energy_grid_keV, data_grid[0])(energy_keV) cont1 = interpolate.interp1d(energy_grid_keV, data_grid[1])(energy_keV) cont2 = interpolate.interp1d(energy_grid_keV, data_grid[2])(energy_keV) # Calculate the continuum intensity as the weighted geometric mean # of the interpolated values across the temperature band of the # temperature component of intensity. logelog10T = np.log(log10T) x0, x1, x2 = np.log(log10T_grid) flux[energy_idx] = np.exp( np.log(cont0) * (logelog10T - x1) * (logelog10T - x2) / ((x0 - x1) * (x0 - x2)) + np.log(cont1) * (logelog10T - x0) * (logelog10T - x2) / ((x1 - x0) * (x1 - x2)) + np.log(cont2) * (logelog10T - x0) * (logelog10T - x1) / ((x2 - x0) * (x2 - x1)) ) return flux def _calculate_abundance_normalized_line_intensities(logT, data_grid, line_logT_bins): """ Calculates normalized line intensities at a given temperature using interpolation. Given a 2D array, say of line intensities, as a function of two parameters, say energy and log10(temperature), and a log10(temperature) value, interpolate the line intensities over the temperature axis and extract the intensities as a function of energy at the input temperature. Note that strictly speaking the code is agnostic to the physical properties of the axes and values in the array. All the matters is that data_grid is interpolated over the 2nd axis and the input value also corresponds to somewhere along that same axis. That value does not have to exactly correspond to the value of a column in the grid. This is accounted for by the interpolation. Parameters ---------- logT: 1D `numpy.ndarray` of `float`. The input value along the 2nd axis at which the line intensities are desired. If multiple values given, the calculation is done for each and the output array has an extra dimension. data_grid: 2D `numpy.ndarray` Some property, e.g. line intensity, as function two parameters, e.g. energy (0th dimension) and log10(temperature in kelvin) (1st dimension). line_logT_bins: 1D `numpy.ndarray` The value along the 2nd axis at which the data are required, say a value of log10(temperature in kelvin). Returns ------- interpolated_data: 1D or 2D `numpy.ndarray` The line intensities as a function of energy (1st dimension) at each of the input temperatures (0th dimension). Note that unlike the input line intensity table, energy here is the 0th axis. If there is only one input temperature, interpolated_data is 1D. """ # Ensure input temperatures are in an array to consistent manipulation. # if np.isscalar(logT): # n_temperatures = 1 # else: n_temperatures = len(logT) # Get bins in which input temperatures belong. temperature_bins = np.digitize(logT, line_logT_bins) - 1 # For each input "temperature", interpolate the grid over the 2nd axis # using the bins corresponding to the input "temperature" and the two neighboring bins. # This will result in a function giving the data as a function of the 1st axis, # say energy, at the input temperature to sub-temperature bin resolution. interpolated_data = np.zeros((n_temperatures, data_grid.shape[0])) for i in range(n_temperatures): # Identify the "temperature" bin to which the input "temperature" # corresponds and its two nearest neighbors. index = temperature_bins[i] - 1 + np.arange(3) # Interpolate the 2nd axis to produce a function that gives the data # as a function of 1st axis, say energy, at a given value along the 2nd axis, # say "temperature". get_intensities_at_logT = interpolate.interp1d(line_logT_bins[index], data_grid[:, index], kind="quadratic") # Use function to get interpolated_data as a function of the first axis at # the input value along the 2nd axis, # e.g. line intensities as a function of energy at a given temperature. interpolated_data[i, :] = get_intensities_at_logT(logT[i]).squeeze()[:] return interpolated_data def _weight_emission_bins_to_line_centroid(line_peaks_keV, energy_edges_keV, line_intensities): """ Split emission between neighboring energy bins such that averaged energy is the line peak. Given the peak energies of the lines and a set of the energy bin edges: 1. Find the bins into which each of the lines belong. 2. Calculate distance between the line peak energy and the center of the bin to which it corresponds as a fraction of the distance between the bin center the center of the next closest bin to the line peak energy. 3. Assign the above fraction of the line intensity to the neighboring bin and the rest of the energy to the original bin. 4. Add the neighboring bins to the array of bins containing positive emission. Parameters ---------- line_peaks_keV: 1D `numpy.ndarray` The energy of the line peaks in keV. energy_peak_keV: 1D `numpy.ndarray` The edges of adjacent energy bins. Length must be n+1 where n is the number of energy bins. These energy bins may be referred to as 'spectrum energy bins' in comments. line_intensities: 2D `numpy.ndarray` The amplitude of the line peaks. The last dimension represents intensities of each line in line_peaks_keV while the first dimension represents the intensities as a function of another parameter, e.g. temperature. These intensities are the ones divided between neighboring bins as described above. Returns ------- new_line_intensities: 2D `numpy.ndarray` The weighted line intensities including neighboring component for each line weighted such that total emission is the same, but the energy of each line averaged over the energy_edge_keV bins is the same as the actual line energy. new_iline: `numpy.ndarray` Indices of the spectrum energy bins to which emission from each line corresponds. This includes indices of the neighboring bin emission components. """ # Get widths and centers of the spectrum energy bins. energy_bin_widths = energy_edges_keV[1:] - energy_edges_keV[:-1] energy_centers = energy_edges_keV[:-1] + energy_bin_widths / 2 energy_center_diffs = energy_centers[1:] - energy_centers[:-1] # For each line, find the index of the spectrum energy bin to which it corresponds. iline = np.digitize(line_peaks_keV, energy_edges_keV) - 1 # Get the difference between each line energy and # the center of the spectrum energy bin to which is corresponds. line_deviations_keV = line_peaks_keV - energy_centers[iline] # Get the indices of the lines which are above and below their bin center. (neg_deviation_indices,) = np.where(line_deviations_keV < 0) (pos_deviation_indices,) = np.where(line_deviations_keV >= 0) # Discard bin indices at the edge of the spectral range if they should # be shared with a bin outside the energy range. neg_deviation_indices = neg_deviation_indices[np.where(iline[neg_deviation_indices] > 0)[0]] pos_deviation_indices = pos_deviation_indices[ np.where(iline[pos_deviation_indices] <= (len(energy_edges_keV) - 2))[0] ] # Split line emission between the spectrum energy bin containing the line peak and # the nearest neighboring bin based on the proximity of the line energy to # the center of the spectrum bin. # Treat lines which are above and below the bin center separately as slightly # different indexing is required. new_line_intensities = copy.deepcopy(line_intensities) new_iline = copy.deepcopy(iline) if len(neg_deviation_indices) > 0: neg_line_intensities, neg_neighbor_intensities, neg_neighbor_iline = _weight_emission_bins( line_deviations_keV, neg_deviation_indices, energy_center_diffs, line_intensities, iline, negative_deviations=True, ) # Combine new line and neighboring bin intensities and indices into common arrays. new_line_intensities[:, neg_deviation_indices] = neg_line_intensities new_line_intensities = np.concatenate((new_line_intensities, neg_neighbor_intensities), axis=-1) new_iline = np.concatenate((new_iline, neg_neighbor_iline)) if len(pos_deviation_indices) > 0: pos_line_intensities, pos_neighbor_intensities, pos_neighbor_iline = _weight_emission_bins( line_deviations_keV, pos_deviation_indices, energy_center_diffs, line_intensities, iline, negative_deviations=False, ) # Combine new line and neighboring bin intensities and indices into common arrays. new_line_intensities[:, pos_deviation_indices] = pos_line_intensities new_line_intensities = np.concatenate((new_line_intensities, pos_neighbor_intensities), axis=-1) new_iline = np.concatenate((new_iline, pos_neighbor_iline)) # Order new_line_intensities so neighboring intensities are next # to those containing the line peaks. ordd = np.argsort(new_iline) new_iline = new_iline[ordd] for i in range(new_line_intensities.shape[0]): new_line_intensities[i, :] = new_line_intensities[i, ordd] return new_line_intensities, new_iline def _weight_emission_bins( line_deviations_keV, deviation_indices, energy_center_diffs, line_intensities, iline, negative_deviations=True ): if negative_deviations is True: if not np.all(line_deviations_keV[deviation_indices] < 0): raise ValueError( "As negative_deviations is True, can only handle " "lines whose energy < energy bin center, " "i.e. all line_deviations_keV must be negative." ) a = -1 b = -1 else: if not np.all(line_deviations_keV[deviation_indices] >= 0): raise ValueError( "As negative_deviations is not True, can only handle " "lines whose energy >= energy bin center, " "i.e. all line_deviations_keV must be positive." ) a = 0 b = 1 # Calculate difference between line energy and the spectrum bin center as a # fraction of the distance between the spectrum bin center and the # center of the nearest neighboring bin. wghts = np.absolute(line_deviations_keV[deviation_indices]) / energy_center_diffs[iline[deviation_indices + a]] # Tile/replicate wghts through the other dimension of line_intensities. wghts = np.tile(wghts, tuple([line_intensities.shape[0]] + [1] * wghts.ndim)) # Weight line intensitites. # Weight emission in the bin containing the line intensity by 1-wght, # since by definition wght < 0.5 and # add the line intensity weighted by wght to the nearest neighbor bin. # This will mean the intensity integrated over the two bins is the # same as the original intensity, but the intensity-weighted peak energy # is the same as the original line peak energy even with different spectrum energy binning. new_line_intensities = line_intensities[:, deviation_indices] * (1 - wghts) neighbor_intensities = line_intensities[:, deviation_indices] * wghts neighbor_iline = iline[deviation_indices] + b return new_line_intensities, neighbor_intensities, neighbor_iline def _sanitize_inputs(energy_edges, temperature, emission_measure): if np.isscalar(energy_edges) or len(energy_edges) < 2 or energy_edges.ndim > 1: raise ValueError("energy_edges must be a 1-D astropy Quantity with length greater than 1.") # Add units regardless of whether or not the parameters # came with them attached. # If they were not already Quantities, the parameters get the default units. energy_edges <<= u.keV temperature <<= u.K emission_measure <<= u.cm**-3 energy_edges_keV = energy_edges.to(u.keV) temperature_K = temperature.to(u.K) if temperature.isscalar: temperature_K = np.array([temperature_K.value]) * temperature_K.unit emission_measure = emission_measure.to(u.cm**-3) if emission_measure.isscalar: emission_measure = np.array([emission_measure.value]) * emission_measure.unit return energy_edges_keV, temperature_K, emission_measure def _error_if_input_outside_valid_range(input_values, grid_range, param_name, param_unit): if input_values.min() < grid_range[0] or input_values.max() > grid_range[1]: if param_name == "temperature": message_unit = "MK" grid_range = u.Quantity(grid_range, unit=param_unit).to_value(message_unit) param_unit = message_unit message = ( f"All input {param_name} values must be within the range {grid_range[0]}--{grid_range[1]} {param_unit}. " ) raise ValueError(message) def _warn_if_input_outside_valid_range(input_values, grid_range, param_name, param_unit): if input_values.min() < grid_range[0] or input_values.max() > grid_range[1]: message = ( f"Some input {param_name} values outside valid range of " f"{grid_range[0]}--{grid_range[1]} {param_unit}. " "Flux will be zero outside this range." ) warnings.warn(message) def _error_if_low_energy_input_outside_valid_range(input_values, grid_range, param_name, param_unit): if input_values.min() < grid_range[0]: message = ( f"Lower bound of the input {param_name} must be within the range " f"{grid_range[0]}--{grid_range[1]} {param_unit}. " ) raise ValueError(message) def _initialize_abundances(abundances: Column) -> tuple[float]: """ From a given abundance Column from an abundance astropy Table, compute the values of logarithmic elemental abundances used in other portions of the module. """ # We don't need a copy to the abundance column here # because we are only reading from it, not writing to it. mg = 12 + np.log10(abundances[11]) al = 12 + np.log10(abundances[12]) si = 12 + np.log10(abundances[13]) s = 12 + np.log10(abundances[15]) ar = 12 + np.log10(abundances[17]) ca = 12 + np.log10(abundances[19]) fe = 12 + np.log10(abundances[25]) return (mg, al, si, s, ar, ca, fe) def _calculate_abundances(abundance_type, mg, al, si, s, ar, ca, fe): abundances = DEFAULT_ABUNDANCES[abundance_type].data.copy() if np.shape(mg) == (1,): abundances[11] = 10 ** (mg[0] - 12) abundances[12] = 10 ** (al[0] - 12) abundances[13] = 10 ** (si[0] - 12) abundances[15] = 10 ** (s[0] - 12) abundances[17] = 10 ** (ar[0] - 12) abundances[19] = 10 ** (ca[0] - 12) abundances[25] = 10 ** (fe[0] - 12) else: abundances[11] = 10 ** (mg - 12) abundances[12] = 10 ** (al - 12) abundances[13] = 10 ** (si - 12) abundances[15] = 10 ** (s - 12) abundances[17] = 10 ** (ar - 12) abundances[19] = 10 ** (ca - 12) abundances[25] = 10 ** (fe - 12) return abundances