""" JPAC amplitude for two pseudoscalars To calculate using the GPU (torch) -> FitWithGPU Or, to use the CPU (numpy, numexpr) -> FitWithCPU other utilities to calculate moments and phase motions are also included Calculate the amplitude used for simulation/prediction where production amplitudes are not fitted but constructed/read-from-a-file See also: V. Mathieu et al., PRD, 100, 054017 (2019) """ import copy from dataclasses import dataclass from typing import Tuple, Dict import numexpr as ne import numpy as np import pandas as pd import scipy.special import scipy.special import torch as tc from PyPWA import NestedFunction def phase_diff(params, wave1, wave2): value = params[::2] + 1j * params[1::2] return np.arctan( np.imag(value[wave1] * value[wave2].conjugate()) / np.real(value[wave1] * value[wave2].conjugate()) ) def calculate_moments(_s0, _p0, _p1, _d0, _d1, _d2): h100 = ne.evaluate("2.*(abs(_s0)**2+abs(_p0)**2+abs(_d0)**2)") h000 = ne.evaluate("h100+2.*(abs(_p1)**2+abs(_d1)**2+abs(_d2)**2)") h110 = ne.evaluate( "(8./sqrt(15))*real(_p0*conj(_d0))" "+ (4./sqrt(3.))*real(_s0*conj(_p0))" ) h010 = ne.evaluate("h110+(4/sqrt(5.))*real(_p1*conj(_d1))") h111 = ne.evaluate( "(2./sqrt(5.))*real(_p0*conj(_d1))" "- (2./sqrt(15.))*real(_p1*conj(_d0))" "+ (2./sqrt(3.))*real(_s0*conj(_p1))" ) h011 = ne.evaluate("h111+(2.*sqrt(2./5.))*real(_p1*conj(_d2))") h120 = ne.evaluate( "(4./5.)*abs(_p0)**2+(4./7.)*abs(_d0)**2" "+ (4./sqrt(5.))*real(_s0*conj(_d0))" ) h020 = ne.evaluate( "h120-(2./5.)*abs(_p1)**2+(2./7.)" "* abs(_d1)**2-(4./7.)*abs(_d2)**2" ) h121 = ne.evaluate( "(2./sqrt(5.))*real(_s0*conj(_d1))" "+ (2.*sqrt(3)/5.)*real(_p0*conj(_p1))" "+ (2./7.)*real(_d0*conj(_d1))" ) h021 = ne.evaluate("h121+(2./7.)*sqrt(6.)*real(_d1*conj(_d2))") h022 = ne.evaluate( "(2./sqrt(5))*real(_s0*conj(_d2))" "- (4./7.)*real(_d0*conj(_d2))" ) h122 = ne.evaluate( "h022+(sqrt(6.)/7.)*abs(_d1)**2" "+ (sqrt(6.)/5.)*abs(_p1)**2" ) rat = ne.evaluate( "(abs(_s0)**2+abs(_p0)**2+abs(_d0)**2)" "/ (abs(_p1)**2+abs(_d1)**2+abs(_d2)**2)" ) sigma4 = rat / (1 + rat) _rat2 = ne.evaluate( "(2./3.)*abs(_s0)**2+(5./6.)*abs(_d0)**2+(5./4.)*abs(_d2)**2" "- (2.*sqrt(5.)/3.)*real(_s0*conj(_d0))" "- (sqrt(10./3))*real(_s0*conj(_d2))" "+ (5./sqrt(6.))*real(_d0*conj(_d2))" ) sigma_y = ne.evaluate("(_rat2-abs(_p1)**2)/(_rat2+abs(_p1)**2)") return ( h000, h010, h011, h020, h021, h022, h100, h110, h111, h120, h121, h122, sigma4, sigma_y ) def make_elm(params): """ Extract (read externally) set of quantum numbers (e=reflectivity, l,m) """ length = int(len(params) / 2) # Produce ELM Dataframe elm = np.empty(length, [("e", int), ("l", int), ("m", int)]) # Extract ELM values from minuit parameters values = [] for param in params[::2]: values.append([int(value) for value in param.split(".")[1:]]) # Recent changes in how Numpy works means I can't pass values # to arrays like I used to, unfortunately. for i in range(length): for j in range(3): elm[i][j] = values[i][j] return elm def produce_specific_decay(theta, phi, m_waves, l_waves): """ Calculate Decays directly from Spherical Harmonics (two pseudoscalars) """ theta[theta < 0] = theta[theta < 0] + 2 * np.pi decay = np.empty((len(m_waves), len(theta)), "c16") for index, (m, l) in enumerate(zip(m_waves, l_waves)): decay[index] = scipy.special.sph_harm(m, l, theta, phi) return decay class PredictionWrapper(NestedFunction): def __init__(self, amplitude): super(PredictionWrapper, self).__init__() self.__amplitude = amplitude self.USE_MP = amplitude.USE_MP self.USE_TORCH = amplitude.USE_TORCH self.USE_THREADS = amplitude.USE_THREADS self.DEBUG = amplitude.DEBUG def setup(self, data): return self.__amplitude.setup(data) def calculate(self, parameters: Dict[str, float]) -> np.ndarray: values = np.empty(len(parameters)) for i, (key, value) in enumerate(parameters.items()): values[i] = value return self.__amplitude.calculate(values) class FitWithGPU(NestedFunction): """ Calculate Amplitude for GPU """ USE_TORCH = True USE_MP = False def __init__(self, initial_params): super(FitWithGPU, self).__init__() self.device = tc.device('cpu') self.__alpha = tc.Tensor([]) self.__pol = tc.Tensor([]) self.__phi = np.array([]) self.__theta = np.array([]) self.__decay = tc.Tensor([]) elm = make_elm(initial_params) self.__e = tc.from_numpy(elm['e']) self.__l = elm['l'] self.__m = elm['m'] def setup(self, data): # Handle Torch Devices based on USE_MP status if not self.USE_MP: self.device = tc.device("cuda:0") self.__alpha = tc.from_numpy(data['alpha']).to(self.device) self.__pol = tc.from_numpy(data['pol']).to(self.device) self.__e = self.__e.to(self.device) self.__decay = tc.from_numpy(produce_specific_decay( data['phi'], data['theta'], self.__m, self.__l )).to(self.device) return self def calculate(self, params): """ Calculate Total Intensity """ vs = params[::2] + 1j * params[1::2] vs = tc.from_numpy(vs).to(self.device) v = vs * self.__decay.T v_conj = vs * tc.conj(self.__decay).T u110 = v.T[self.__e == 1].sum(0) u120 = v_conj.T[self.__e == 1].sum(0) u111 = v.T[self.__e == -1].sum(0) u121 = v_conj.T[self.__e == -1].sum(0) return self.__compute_intensity(u110, u120, u111, u121) def calculate_wave(self, params, wave): """ Calculate Partial Intensity (in each wave) """ vs = params[::2] + 1j * params[1::2] vs = tc.from_numpy(vs).to(self.device) v = vs[wave] * self.__decay[wave] v_conj = vs[wave] * tc.conj(self.__decay)[wave] u110 = v[self.__e[wave] == 1].sum(0) u120 = v_conj[self.__e[wave] == 1].sum(0) u111 = v[self.__e[wave] == -1].sum(0) u121 = v_conj[self.__e[wave] == -1].sum(0) return self.__compute_intensity( u110, u120, u111, u121 ).cpu().detach().numpy() def __compute_intensity( self, u110: tc.Tensor, u120: tc.Tensor, u111: tc.Tensor, u121: tc.Tensor) -> tc.Tensor: i0 = ( u110 * tc.conj(u110) + u120 * tc.conj(u120) + u111 * tc.conj(u111) + u121 * tc.conj(u121) ).real i1 = 2 * (-1 * (u110 * tc.conj(u120)).real + (u111 * tc.conj(u121)).real) i2 = 2 * (-1 * (u110 * tc.conj(u120)).imag + (u111 * tc.conj(u121)).imag) intensity = i0 - self.__pol * i1 * tc.cos(2 * self.__alpha) intensity -= self.__pol * i2 * tc.sin(2 * self.__alpha) #intensity=i0 return intensity.real @staticmethod def calculate_moments(params): s0, p0, p1, d0, d1, d2 = (params[::2] + 1j * params[1::2])[:6] return calculate_moments(s0, p0, p1, d0, d1, d2) def get_elm(self): return pd.DataFrame({"e": self.__e.cpu(), "l": self.__l, "m": self.__m}) class FitWithCPU(NestedFunction): """ Calculate Amplitude for CPU """ def __init__(self, initial_params): super(FitWithCPU, self).__init__() self.__alpha = np.array([]) self.__pol = np.array([]) self.__phi = np.array([]) self.__theta = np.array([]) self.__decay = np.array([]) elm = make_elm(initial_params) self.__e = elm['e'] self.__l = elm['l'] self.__m = elm['m'] def setup(self, data): self.__alpha = data['alpha'] self.__pol = data['pol'] self.__decay = produce_specific_decay( data['phi'], data['theta'], self.__m, self.__l ) return self def calculate(self, params): """ Calculate Total Intensity """ vs = params[::2] + 1j * params[1::2] v = vs * self.__decay.T v_conj = vs * self.__decay.conj().T u110 = v.T[self.__e == 1].sum(0) u120 = v_conj.T[self.__e == 1].sum(0) u111 = v.T[self.__e == -1].sum(0) u121 = v_conj.T[self.__e == -1].sum(0) return self.__compute_intensity(u110, u120, u111, u121) def calculate_wave(self, params, wave): """ Calculate Partial Intensity (in each wave) """ vs = params[::2] + 1j * params[1::2] v = vs[wave] * self.__decay[wave] v_conj = vs[wave] * self.__decay.conj()[wave] u110 = v[self.__e[wave] == 1].sum(0) u120 = v_conj[self.__e[wave] == 1].sum(0) u111 = v[self.__e[wave] == -1].sum(0) u121 = v_conj[self.__e[wave] == -1].sum(0) return self.__compute_intensity(u110, u120, u111, u121) def __compute_intensity( self, u110: np.ndarray, u120: np.ndarray, u111: np.ndarray, u121: np.ndarray) -> np.ndarray: i0 = ( u110 * u110.conj() + u120 * u120.conj() + u111 * u111.conj() + u121 * u121.conj() ).real i1 = 2 * (-1 * (u110 * u120.conj()).real + (u111 * u121.conj()).real) i2 = 2 * (-1 * (u110 * u120.conj()).imag + (u111 * u121.conj()).imag) intensity = i0 - self.__pol * i1 * np.cos(2 * self.__alpha) intensity -= self.__pol * i2 * np.sin(2 * self.__alpha) # intensity = i0 return intensity.real @staticmethod def calculate_moments(params): s0, p0, p1, d0, d1, d2 = (params[::2] + 1j * params[1::2])[:6] return calculate_moments(s0, p0, p1, d0, d1, d2) def get_elm(self): return pd.DataFrame({"e": self.__e, "l": self.__l, "m": self.__m}) class Simulate(NestedFunction): """ Calculate Intensity for simulation """ @dataclass class Resonances: cr: np.ndarray w0: np.ndarray r0: np.ndarray wave: np.ndarray e: np.ndarray l: np.ndarray m: np.ndarray def __init__(self): super(Simulate, self).__init__() self.__alpha = np.array([]) self.__pol = np.array([]) self.__phi = np.array([]) self.__theta = np.array([]) self.__decay = np.array([]) self.__mass = np.array([]) self.__vs = np.array([]) self.__res = None # type: Simulate.Resonances def load_resonances_from_uri(self, elm_file: str, res_file: str): resonances = pd.read_csv(res_file) elms = pd.read_csv(elm_file) self.__set_resonances( resonances, elms["E"].to_numpy(), elms["L"].to_numpy(), elms["M"].to_numpy() ) return self def load_resonances(self, cr, w0, r0, e_wave, l_wave, m_wave): df = pd.DataFrame({"Cr": cr, "W0": w0, "R0": r0}) self.__set_resonances(df, e_wave, l_wave, m_wave) return self def __set_resonances( self, resonances: pd.DataFrame, e_wave: np.ndarray, l_wave: np.ndarray, m_wave: np.ndarray ): wave = np.empty([len(resonances), len(e_wave)], "f4") for index in range(len(resonances)): for w_index in range(len(e_wave)): wave[index][w_index] = resonances[f"Wave{w_index}"][index] self.__res = self.Resonances( resonances["Cr"].to_numpy(), resonances["W0"].to_numpy(), resonances["R0"].to_numpy(), wave, e_wave, l_wave, m_wave ) def setup(self, data): self.__pol = data["pol"] self.__alpha = data["alpha"] self.__mass = data["mass"] self.__vs = self.__make_vs() self.__decay = produce_specific_decay( data['phi'], data['theta'], self.__res.m, self.__res.l ) # Returning self allows us to chain method calls return self def __make_vs(self): vs = np.empty( [len(self.__res.cr), len(self.__res.e), len(self.__pol)], "c16" ) for ri in range(len(self.__res.cr)): for wi in range(len(self.__res.e)): _cr = self.__res.cr[ri] _wave = self.__res.wave[ri][wi] _w0 = self.__res.w0[ri] _r0 = self.__res.r0[ri] _mass = self.__mass frac = ne.evaluate( "(sqrt(_cr * _wave)*_w0*_r0)" "/ ((_w0**2 - _mass**2)**2 + _w0**2 * _r0**2)" ) complex_part = ne.evaluate( "(_w0**2 - _mass**2) + 1j*(_w0 * _r0)" ) vs[ri][wi] = frac * complex_part return vs def calculate(self, *_): """ Calculate the total intensity """ u110 = np.zeros(len(self.__pol), "c16") u111 = np.zeros(len(self.__pol), "c16") u120 = np.zeros(len(self.__pol), "c16") u121 = np.zeros(len(self.__pol), "c16") for res in range(len(self.__res.cr)): v = self.__vs[res] * self.__decay v_conj = self.__vs[res] * self.__decay.conj() u110 += v[self.__res.e == 1].sum(0) u120 += v_conj[self.__res.e == 1].sum(0) u111 += v[self.__res.e == -1].sum(0) u121 += v_conj[self.__res.e == -1].sum(0) return self.__compute_intensity(u110, u120, u111, u121)[0] def calculate_partials(self): u110 = np.zeros(len(self.__pol), "c16") u111 = np.zeros(len(self.__pol), "c16") u120 = np.zeros(len(self.__pol), "c16") u121 = np.zeros(len(self.__pol), "c16") for r1 in range(len(self.__res.cr)): for r2 in range(len(self.__res.cr)): for w1 in range(len(self.__res.e)): for w2 in range(len(self.__res.e)): vs = self.__vs[r1][w1] * self.__vs[r2][w2].conj() # Will enable or disable interactions without branching s1 = self.__res.e[w1] == 1 and self.__res.e[w2] == 1 s2 = self.__res.e[w1] == -1 and self.__res.e[w2] == -1 u110 += s1 * vs * self.__decay[w1] * self.__decay[w2] u120 += s1 * vs * self.__decay[w1] * self.__decay[w2].conj() u111 += s2 * vs * self.__decay[w1] * self.__decay[w2] u121 += s2 * vs * self.__decay[w1] * self.__decay[w2].conj() return self.__compute_intensity(u110, u120, u111, u121)[1:] def __compute_intensity( self, u110: np.ndarray, u120: np.ndarray, u111: np.ndarray, u121: np.ndarray ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: i0 = ( u110 * u110.conj() + u120 * u120.conj() + u111 * u111.conj() + u121 * u121.conj() ).real i1 = 2 * (-1 * (u110 * u120.conj()).real + (u111 * u121.conj()).real) i2 = 2 * (-1 * (u110 * u120.conj()).imag + (u111 * u121.conj()).imag) intensity = i0 - self.__pol * i1 * np.cos(2 * self.__alpha) intensity -= self.__pol * i2 * np.sin(2 * self.__alpha) # intensity = i0 if np.sum(np.imag(intensity)) > 1.e-6: print("Imaginary values have been found!") return intensity.real, i0, i1 def calculate_moments(self): s0, p0, _, p1, d0, _, d1, _, d2 = self.vs.sum(0) return calculate_moments(s0, p0, p1, d0, d1, d2) @property def vs(self): return self.__vs.copy() @vs.setter def vs(self, variable): if len(variable) != len(self.__vs): raise ValueError(f"Must be {len(self.__vs)} length!") else: self.__vs = variable.copy() @property def decay(self): return self.__decay.copy() @property def resonance(self): return copy.deepcopy(self.__res)