diff --git a/docsrc/API_docs.rst b/docsrc/API_docs.rst index 2c625c4..c6c4340 100644 --- a/docsrc/API_docs.rst +++ b/docsrc/API_docs.rst @@ -36,6 +36,7 @@ Sequences pyepr.sequences.CarrPurcellSequence pyepr.sequences.ResonatorProfileSequence pyepr.sequences.TWTProfileSequence + pyepr.sequences.T1InversionRecoverySequence Pulses ~~~~~~ @@ -90,6 +91,7 @@ I/O pyepr.dataset.create_dataset_from_sequence pyepr.dataset.create_dataset_from_axes pyepr.dataset.create_dataset_from_bruker + pyepr.dataset.downconvert_dataset Utilities ~~~~~~~~~ diff --git a/docsrc/conf.py b/docsrc/conf.py index f0a7cec..5524a89 100644 --- a/docsrc/conf.py +++ b/docsrc/conf.py @@ -11,6 +11,8 @@ from pyepr import __version__, __copyright__ sys.path.insert(0, os.path.abspath('..')) +import matplotlib +matplotlib.use('Agg') # Use non-interactive backend project = 'PyEPR' copyright = __copyright__ @@ -30,7 +32,10 @@ 'sphinx_copybutton', 'numpydoc', 'sphinx_favicon', - 'sphinx_gallery.gen_gallery'] + 'sphinx_gallery.gen_gallery', + 'matplotlib.sphinxext.plot_directive', + 'sphinx.ext.imgmath' +] templates_path = ['_templates'] exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store'] @@ -41,6 +46,10 @@ autodoc_typehints = "description" autoapi_template_dir = "_templates/autoapi" +plot_include_source = True +plot_html_show_source_code = False +plot_formats = [('png', 100)] + autoapi_keep_files = True autoapi_add_toctree_entry = False autoapi_python_class_content= "both" diff --git a/docsrc/install.rst b/docsrc/install.rst index e431c7a..f10a207 100644 --- a/docsrc/install.rst +++ b/docsrc/install.rst @@ -38,5 +38,4 @@ PyEPR requires: - h5netcdf - toml - deerlab (https://github.com/JeschkeLab/DeerLab) - - numba - psutil diff --git a/docsrc/releasenotes.rst b/docsrc/releasenotes.rst index 1c3a3be..9070159 100644 --- a/docsrc/releasenotes.rst +++ b/docsrc/releasenotes.rst @@ -1,6 +1,19 @@ Release Notes ============= +Version 1.1 (2026-05-27): +++++++++++++++++++++++++++++ +- Added `AmplifierLinearityAnalysis` class for characterizing amplifier non-linearity. +- Added `T1InversionRecovery` sequence. +- Added rise time to linear chirp pulses. +- Added right based arithmatic. +- Fixed Version Detection Bug +- Fixed Dependency issues +- Improved Documentation +- Removed Numba dependency + + + Version 1.0.0 (2025-09-12): ++++++++++++++++++++++++++++ - All references to `LO` have been changed to `freq` in the frequency object and related. diff --git a/docsrc/tutorial_sequencer.md b/docsrc/tutorial_sequencer.md index a78d50c..784cbf3 100644 --- a/docsrc/tutorial_sequencer.md +++ b/docsrc/tutorial_sequencer.md @@ -1,6 +1,6 @@ # Pulse Sequencer -PyEPR provides an intuitive object-oriented pulse programmer allowing the user to design pulsesequences in a hardware-agnostic manner. Additionally, several common EPR experiments are pre-defined and can be easily instantiated and modified. +PyEPR provides an intuitive object-oriented pulse programmer allowing the user to design pulse sequences in a hardware-agnostic manner. Additionally, several common EPR experiments are pre-defined and can be easily instantiated and modified. PyEPR uses ns, GHz and G as the default time, frequency and field units. Very occasionally, other units such as µs or MHz are used, in which case it will be explicitly mentioned. @@ -31,33 +31,33 @@ These pulses will eventually need a scale (amplitude), before the sequence can b A Detection window is also created ```python p90 = epr.RectPulse(tp=16, - freq=0, # Frequency offset in MHz, w.r.t the sequence frequency, + freq=0, # Frequency offset in GHz, w.r.t the sequence frequency, flipangle=np.pi/2, # Flip angle in degrees pcyc = {"phases":[0, np.pi], "dets":[1,-1]} ) p180 = epr.RectPulse(tp=32, - freq=0, # Frequency offset in MHz, w.r.t the sequence frequency, + freq=0, # Frequency offset in GHz, w.r.t the sequence frequency, flipangle=np.pi, # Flip angle in degrees ) -det = epr.Detetction(tp=32, - freq=0, # Frequency offset in MHz, w.r.t the sequence frequency, +det = epr.Detection(tp=32, + freq=0, # Frequency offset in GHz, w.r.t the sequence frequency, ) ``` We now need a time axis for our sequence and to add them to the sequence object. -When a pulse is copied into the sequence using the `add_pulse` method, parameters can be modified allowing the same pulse can be used multiple times with different timings or amplitudes. +When a pulse is copied into the sequence using the `addPulse` method, parameters can be modified allowing the same pulse can be used multiple times with different timings or amplitudes. ```python t = epr.Parameter(name='Interpulse Delay', value=400, # Initial interpulse delay in ns step=8, # Step size in ns - dim=1024 # Number of points, - unit='ns' # Unit of the parameter + dim=1024, # Number of points, + unit='ns', # Unit of the parameter description='Interpulse delay between the pi/2 and pi pulse' ) # Adding the pulses to the sequence -seq.add_pulse(p90.copy(t=0)) -seq.add_pulse(p180.copy(t=t)) -seq.add_pulse(det.copy(t=2*t)) +seq.addPulse(p90.copy(t=0)) +seq.addPulse(p180.copy(t=t)) +seq.addPulse(det.copy(t=2*t)) # Defining the evolution seq.evolution([t]) @@ -81,7 +81,7 @@ HE_Seq = epr.HahnEchoRelaxationSequence( shots = 20, # Number of shots per point start = 400, # Initial interpulse delay in ns step = 8, # Step size in ns - dim = 1024 # Number of points + dim = 1024, # Number of points pi2_pulse = p90, # The 90 degree pulse pi_pulse = p180 # The 180 degree pulse ) diff --git a/pyepr/classes.py b/pyepr/classes.py index 4a20401..b0df3bf 100644 --- a/pyepr/classes.py +++ b/pyepr/classes.py @@ -23,11 +23,19 @@ class Interface: """ def __init__(self,config_file:dict=None,log=None) -> None: + """ + Parameters + ---------- + config_file : dict or str or Path, optional + The configuration file or dict for the spectrometer interface, by default None. If None, a default configuration will be used. + log : logging.Logger, optional + The logger to be used, by default None. If None, a default logger will be created. + """ if isinstance(config_file, (str,Path)): with open(config_file, 'r') as f: config_file = yaml.safe_load(f) - self.config = config_file if isinstance(config_file, dict) else {} + self.config = config_file if isinstance(config_file, dict) else {"Spectrometer":{"Bridge":{}}} self.pulses = {} self.savefolder = str(Path.home()) @@ -37,7 +45,10 @@ def __init__(self,config_file:dict=None,log=None) -> None: else: self.log = log self.resonator = None - self.amp_nonlinearity = self.config["Spectrometer"]["Bridge"].get('Amplifier Non-Linearity',None) + if self.config != {}: + self.amp_nonlinearity = self.config["Spectrometer"]["Bridge"].get('Amplifier Non-Linearity',None) + else: + self.amp_nonlinearity = None pass def connect(self) -> None: @@ -153,7 +164,8 @@ def terminate_at(self, criterion, test_interval=2, keep_running=True, verbosity= data = self.acquire_dataset() if autosave: self.log.debug(f"Autosaving to {os.path.join(self.savefolder,self.savename)}") - data.to_netcdf(os.path.join(self.savefolder,self.savename),engine='h5netcdf',invalid_netcdf=True) + # data.to_netcdf(os.path.join(self.savefolder,self.savename),engine='h5netcdf',invalid_netcdf=True) + data.epr.save(os.path.join(self.savefolder,self.savename)) try: # nAvgs = data.num_scans.value @@ -278,7 +290,7 @@ def __init__(self, name, value, unit="", description="", virtual=False, self.value = value self.NUS = False # uniform sampling elif isinstance(value, np.ndarray): - self.value = np.median(value) + self.value = value[0] axis = value - self.value self.NUS = True # non-uniform sampling elif value is None: @@ -382,17 +394,22 @@ def adjust_step(self, waveform_precision, keep_dim=True): current_step =old_axis[1] - old_axis[0] # test if uniformally sampled if not np.allclose(np.diff(self.axis[i]["axis"]), current_step): - raise ValueError("This only works for uniformaly sampled data at the moment") - new_step = round_step(current_step, waveform_precision) - - if new_step == 0: - new_step = waveform_precision - - if keep_dim: - dim = old_axis.shape[0] - new_axis = np.arange(self.axis[i]["axis"][0], self.axis[i]["axis"][0]+new_step*dim, new_step) + tolerance = 1e-9 + new_axis = copy.deepcopy(old_axis) + remainders = np.abs(new_axis % waveform_precision) + not_multiples = ~(np.isclose(remainders, 0, atol=1e-9) | np.isclose(remainders, waveform_precision, atol=1e-9)) + new_axis[not_multiples] = np.round(new_axis[not_multiples] / waveform_precision) * waveform_precision else: - new_axis = np.arange(self.axis[i]["axis"][0], self.axis[i]["axis"][-1]+new_step, new_step) + new_step = round_step(current_step, waveform_precision) + + if new_step == 0: + new_step = waveform_precision + + if keep_dim: + dim = old_axis.shape[0] + new_axis = np.arange(self.axis[i]["axis"][0], self.axis[i]["axis"][0]+new_step*dim, new_step) + else: + new_axis = np.arange(self.axis[i]["axis"][0], self.axis[i]["axis"][-1]+new_step, new_step) self.axis[i]["axis"] = new_axis if isinstance(self.value, numbers.Number): @@ -496,6 +513,10 @@ def __add__(self, __o:object): raise RuntimeError( "Both parameters axis and the array must have the same shape") + def __radd__(self, __o:object): + return self.__add__(__o) + + def __sub__(self, __o:object): if type(__o) is Parameter: @@ -564,6 +585,9 @@ def __sub__(self, __o:object): raise RuntimeError( "Both parameters axis and the array must have the same shape") + def __rsub__(self, __o:object): + return self.__sub__(__o) + def __mul__(self, __o:object): if type(__o) is Parameter: if self.unit != __o.unit: diff --git a/pyepr/dataset.py b/pyepr/dataset.py index 868200e..1bf4c2f 100644 --- a/pyepr/dataset.py +++ b/pyepr/dataset.py @@ -102,7 +102,8 @@ def create_dataset_from_sequence(data, sequence: Sequence,extra_params={}): attr.update({'autoDEER_Version':__version__}) return xr.DataArray(data, dims=dims, coords=coords,attrs=attr) -def create_dataset_from_axes(data, axes, params: dict = {},axes_labels=None): +def create_dataset_from_axes(data, axes, params: dict = {},extra_coords:dict = None, + axes_labels=None): """ Create an xarray dataset from a numpy array and a list of axes. @@ -129,7 +130,9 @@ def create_dataset_from_axes(data, axes, params: dict = {},axes_labels=None): if not isinstance(axes, list): axes = [axes] coords = {default_labels.pop(0):a for a in axes} - params.update({'autoDEER_Version':__version__}) + if extra_coords is not None: + coords.update(extra_coords) + params.update({'PyEPR_Version':__version__}) return xr.DataArray(data, dims=dims, coords=coords, attrs=params) @@ -143,7 +146,7 @@ def create_dataset_from_bruker(filepath): labels = [] for i in range(ndims): ax_label = default_labels[i] - axis_string = params['DESC'][f'{ax_label}UNI'] + axis_string = params['DESC'].get(f'{ax_label}UNI',"") if "'" in axis_string: axis_string = axis_string.replace("'", "") if axis_string == 'G': @@ -162,11 +165,12 @@ def create_dataset_from_bruker(filepath): coords = {labels[i]:(default_labels[i],a) for i,a in enumerate(axes)} attr = {} - attr['LO'] = float(params['SPL']['MWFQ']) / 1e9 - attr['B'] = float(params['SPL']['B0VL']) * 1e4 - attr['reptime'] = float(params['DSL']['ftEpr']['ShotRepTime'].replace('us','')) - attr['nAvgs'] = int(params['DSL']['recorder']['NbScansAcc']) - attr['shots'] = int(params['DSL']['ftEpr']['ShotsPLoop']) + if 'DSL' in params: + attr['LO'] = float(params['SPL']['MWFQ']) / 1e9 + attr['B'] = float(params['SPL']['B0VL']) * 1e4 + attr['reptime'] = float(params['DSL']['ftEpr']['ShotRepTime'].replace('us','')) + attr['nAvgs'] = int(params['DSL']['recorder']['NbScansAcc']) + attr['shots'] = int(params['DSL']['ftEpr']['ShotsPLoop']) attr.update({'autoDEER_Version':__version__}) return xr.DataArray(data, dims=dims, coords=coords, attrs=attr) @@ -183,7 +187,7 @@ def save(self, filename, type='netCDF',overwrite=True): Parameters ---------- - filename : str + filename : str, file-like object The name of the file to save the dataset type : str, optional The type of file to save, by default 'netCDF' (including .h5) @@ -196,7 +200,7 @@ def save(self, filename, type='netCDF',overwrite=True): mode = "a" #if filename doesn't have the extension .h5, add it - if not filename.endswith('.h5'): + if isinstance(filename,str) and not filename.endswith('.h5'): filename = filename + '.h5' if 'Scan' in self._obj.dims: @@ -375,3 +379,108 @@ def merge(self,other,ignore_errors=True): +def downconvert_dataset(dataset, filter_type='boxcar',IF=None,reduce=True,sampling_rate=None,**kwargs): + """ + Downconvert a dataset to baseband using a filter + Parameters + ---------- + dataset : xr.DataArray + The dataset to downconvert + filter_type : str, optional + The type of filter to use, by default 'boxcar'. Other options are 'cheby' and a Pulse object + filter_width : float, optional + The width of the filter in MHz or ns depending on filter tyre, by default 20 ns for boxcar and 50 MHz for cheby. + IF : float, optional + The intermediate frequency to use, by default 0.15 + reduce : bool, optional + If True, the dataset is reduced to a single point, by default True + **kwargs : dict + Extra arguments to pass to the filter function + + Returns + ------- + xr.DataArray + The downconverted dataset + """ + + if IF is None: + if 'IF_freq' in dataset.attrs: + IF = dataset.attrs.get('IF_freq') # GHz + elif 'if_freq' in dataset.attrs: + IF = dataset.attrs.get('if_freq') # GHz + elif 'IFfreq' in dataset.attrs: + IF = dataset.attrs.get('IFfreq') # GHz + else: + raise ValueError('IFfreq not found in dataset attributes, please provide IF value') + + if sampling_rate is None: + if 'det_rate' in dataset.attrs: + sampling_rate = dataset.attrs.get('det_rate') # GHz + elif 'sampling_rate' in dataset.attrs: + sampling_rate = dataset.attrs.get('sampling_rate') # GHz + else: + raise ValueError('sampling_rate not found in dataset attributes, please provide sampling_rate value') + if filter_type is None: + dc_first=True + elif filter_type not in ['boxcar','cheby']: + filter_type = 'cheby' + filter_width = 50 + + + if filter_type == 'boxcar': + filter_width = kwargs.get('filter_width',20) + funct = lambda data: np.convolve(data,np.ones(filter_width),mode='same') + dc_first=True + elif isinstance(filter_type, ad_pulses.Pulse): # match filter to a epr Pulse + raise NotImplementedError('Match filter to a pulse not implemented yet') + elif filter_type == 'cheby': + from scipy.signal import cheby1,sosfiltfilt + filter_width = kwargs.get('filter_width',50) # MHz + filter_width /= 1e3 + + Wn = np.array([IF-filter_width,IF+filter_width]) + Wn[Wn<=0] = 0.001 + order = 5 + + a = cheby1(order,0.5,Wn,'bandpass',analog=False,fs=sampling_rate,output='sos') + funct = lambda data: sosfiltfilt(a,data) + dc_first=False + elif filter_type is None: + dc_first=True + + else: + raise ValueError('Filter not recognised') + + if dc_first: + data_array_dc = dataset*np.exp(-1j*2*np.pi*IF*dataset.tx/sampling_rate) + if filter_type is None: + return data_array_dc + data_array_dc.data = np.apply_along_axis(funct,-1,data_array_dc.data) + else: + data_array_dc = xr.apply_ufunc(funct,dataset) + data_array_dc = data_array_dc*np.exp(-1j*2*np.pi*IF*data_array_dc.tx/sampling_rate) + + # if data_array_dc.ndim == 3: + # max_echo_pos = np.unravel_index(np.argmax(np.abs(data_array_dc.data[0,0,20:-20])),data_array_dc.shape[-1])[0] + 20 + # else: + # max_echo_pos = np.unravel_index(np.argmax(np.abs(data_array_dc.data[0,20:-20])),data_array_dc.shape[-1])[0] + 20 + if reduce: + if 'max_echo_pos' in kwargs: + max_echo_pos = kwargs['max_echo_pos'] + else: + max_echo_pos = np.unravel_index(np.abs(data_array_dc).argmax(),data_array_dc.shape)[-1] + return data_array_dc[...,max_echo_pos] + else: + return data_array_dc + +def find_peak(dataset, freq,freq_axis=None,search_range=4): + if freq/1e3 < freq_axis.min() or freq/1e3 > freq_axis.max(): + return "N/A", "N/A" + + if freq_axis is None: + freq_axis = dataset.offset_frequency.values + n_points = dataset.shape[0] + loc = np.argmin(np.abs(freq_axis-freq/1e3)) + loc = loc-search_range + dataset.values[loc-search_range:loc+search_range].argmax() + peak = dataset[loc].values + return loc,peak diff --git a/pyepr/hardware/Bruker_AWG.py b/pyepr/hardware/Bruker_AWG.py index 52fac3f..e900e06 100644 --- a/pyepr/hardware/Bruker_AWG.py +++ b/pyepr/hardware/Bruker_AWG.py @@ -75,6 +75,17 @@ def __init__(self, config_file) -> None: def connect(self, d0=None) -> None: + """ + Connects to the spectrometer through the XeprAPI. Automatically sets + up the spectrometer for pulse experiments. + + Parameters + ---------- + d0: float, optional + The d0 value to be used. If None, the d0 will be calculated + upon connection. By default None. + + """ self.api.connect() @@ -84,17 +95,36 @@ def connect(self, d0=None) -> None: return super().connect() def setup(self,d0=None): + """ + Sets up the spectrometer for pulse experiments. + The video bandwidth is read from the configuration file, and the timebase + is set accordingly. + + Parameters + ---------- + d0: float, optional + The d0 value to be used. If None, the d0 will be calculated + upon connection. By default None. + + """ + # Check if needs to switch to pulse mode, only if needed as slow + if not self.api.hidden['BrPlsMode'].value: + hw_log.info('Switching to Pulse Mode') + temp_mw_amp = self.api.get_MW_amp() + + self.api.hidden['BrPlsMode'].value = True # Turns the mw (video) amplifier on + time.sleep(3) # Wait for bridge to actually switch to pulse mode + self.api.set_MW_amp(temp_mw_amp) - self.api.hidden['BrPlsMode'].value = True self.api.hidden['OpMode'].value = 'Operate' # self.api.hidden['RefArm'].value = 'On' - # TODO add detection of VAMP-III model video_bw = self.bridge_config.get('Video BW') # self.api.cur_exp['VideoBW'].value = 20 self.time_base = 1/(video_bw*2e-3) self.api.hidden['specJet.TimeBase'].value = self.time_base + if d0 is None: @@ -282,13 +312,14 @@ def launch(self, sequence: Sequence, savename: str, start=True, tune=True, def tune_rectpulse(self,*,tp, freq, B, reptime, shots=400): - """Generates a rectangular pi and pi/2 pulse of the given length at - the given field position. This value is stored in the pulse cache. + """ + Generates a rectangular pi/2 and pi pulse at the given frequency and field. + The pulses are of equal amplitude ($t_p$ for pi/2 and $2*t_p$ for pi) and are tuned using a Hahn echo sequence. Parameters ---------- tp : float - Pulse length in ns + $pi/2$ Pulse length in ns freq : float Central frequency of this pulse in GHz B : float @@ -303,7 +334,7 @@ def tune_rectpulse(self,*,tp, freq, B, reptime, shots=400): p90: RectPulse A tuned rectangular pi/2 pulse of length tp p180: RectPulse - A tuned rectangular pi pulse of length tp + A tuned rectangular pi pulse of length 2*tp """ time.sleep(5) amp_tune =HahnEchoSequence( @@ -335,7 +366,7 @@ def tune_rectpulse(self,*,tp, freq, B, reptime, shots=400): data = np.abs(dataset.data) scale_amp = np.around(dataset.pulse0_scale[data.argmax()].data,2) if scale_amp > 0.95: - raise RuntimeError("Not enough power avaliable.") + raise RuntimeError("Not enough power available.") if scale_amp == 0: warnings.warn("Pulse tuned with a scale of zero!") diff --git a/pyepr/hardware/Bruker_MPFU.py b/pyepr/hardware/Bruker_MPFU.py index 973a0fe..2f50c48 100644 --- a/pyepr/hardware/Bruker_MPFU.py +++ b/pyepr/hardware/Bruker_MPFU.py @@ -957,7 +957,7 @@ def ELDORtune(interface, sequence, freq, MPFU=True, for i,x in enumerate(atten_axis): interface.api.set_attenuator('ELDOR',x) # Set phase to value time.sleep(1) - data[i] = np.trapz(get_specjet_data(interface)) + data[i] = np.trapezoid(get_specjet_data(interface)) data = correctphase(data) if data[np.abs(data).argmax()].max() < 1: diff --git a/pyepr/hardware/Bruker_tools.py b/pyepr/hardware/Bruker_tools.py index 7d419ab..cc945da 100644 --- a/pyepr/hardware/Bruker_tools.py +++ b/pyepr/hardware/Bruker_tools.py @@ -1287,9 +1287,7 @@ def write_pulsespel_file(sequence,d0, AWG=False, MPFU=False,MaxGate=40): pcyc_hash = pcyc.pcyc_hash pcyc_str = pcyc.__str__() else: - pcyc = PSPhaseCycle(sequence, MPFU) - pcyc_hash = pcyc.pcyc_hash - pcyc_str = pcyc.__str__() + raise ValueError("Either AWG must be True or MPFU must be supplied") # Add Pulse Sequence pulse_str = "d9\n" for id, pulse in enumerate(sequence.pulses): diff --git a/pyepr/hardware/ETH_awg.py b/pyepr/hardware/ETH_awg.py index 3604529..ad1b653 100644 --- a/pyepr/hardware/ETH_awg.py +++ b/pyepr/hardware/ETH_awg.py @@ -141,7 +141,7 @@ def acquire_dataset_from_matlab(self, verbosity=0,**kwargs): # filename = cur_exp['savename'] # files = os.listdir(folder_path) - curexpname = self.workspace['currexpname'] + curexpname = self.workspace['currfilename'] # def extract_date_time(str): # output = re.findall(r"(\d{8})_(\d{4})", str) # if output != []: @@ -255,7 +255,7 @@ def get_buffer(self, verbosity=0,**kwargs): def read_dataset(self, verbosity=0,savenow=False, **kwargs): cur_exp = self.workspace['currexp'] folder_path = cur_exp['savepath'] - curexpname = self.workspace['currexpname'] + curexpname = self.workspace['currfilename'] path = folder_path + "\\" + curexpname + ".mat" @@ -459,7 +459,7 @@ def launch_long(self, sequence , savename: str, IFgain: int = 0, axID=-1): def isrunning(self) -> bool: if self.bg_thread is None: # state = bool(self.engine.dig_interface('savenow')) - _,_,_,state = self.engine.dig_interface('progress', nargout=4) + _,_,_,state,_ = self.engine.dig_interface('progress', nargout=5) if state == 0: state = False @@ -473,14 +473,14 @@ def isrunning(self) -> bool: state= self.bg_thread.is_alive() return state - def tune_rectpulse(self,*,tp, freq, B, reptime, shots=400): + def tune_rectpulse(self,*,tp, freq, B, reptime, shots=400,step=0.02, dim=45): """Generates a rectangular pi and pi/2 pulse of the given length at the given field position. This value is stored in the pulse cache. Parameters ---------- tp : float - Pulse length of pi/2 pulse in ns + $pi/2$ Pulse length in ns freq : float Central frequency of this pulse in GHz B : float @@ -489,20 +489,26 @@ def tune_rectpulse(self,*,tp, freq, B, reptime, shots=400): Shot repetion time in us. shots: int The number of shots + step: float + The step size for the amplitude tuning, default 0.02 + dim: int + The dimension of the amplitude axis, default 45 Returns ------- p90: RectPulse A tuned rectangular pi/2 pulse of length tp p180: RectPulse - A tuned rectangular pi pulse of length tp + A tuned rectangular pi pulse of length 2*tp """ - amp_tune =HahnEchoSequence( B=B, freq=freq, reptime=reptime, averages=1, shots=shots ) - scale = Parameter("scale",0,dim=45,step=0.02) + max_val = step * (dim -1) + if max_val > 1.0: + raise ValueError("Step and dim values result in scale > 1.0") + scale = Parameter("scale",0,dim=dim,step=step) amp_tune.pulses[0].tp.value = tp amp_tune.pulses[0].scale = scale amp_tune.pulses[1].tp.value = tp * 2 @@ -520,7 +526,7 @@ def tune_rectpulse(self,*,tp, freq, B, reptime, shots=400): data = np.abs(dataset.data) scale = np.around(dataset.pulse0_scale[data.argmax()].data,2) if scale > 0.9: - raise RuntimeError("Not enough power avaliable.") + raise RuntimeError("Not enough power available.") if scale == 0: warnings.warn("Pulse tuned with a scale of zero!") @@ -533,7 +539,7 @@ def tune_rectpulse(self,*,tp, freq, B, reptime, shots=400): return p90, p180 - def tune_pulse(self, pulse, mode, freq, B , reptime, shots=400): + def tune_pulse(self, pulse, mode, freq, B , reptime, shots=400,step=0.02, dim=45): """Tunes a single pulse a range of methods. Parameters @@ -550,6 +556,10 @@ def tune_pulse(self, pulse, mode, freq, B , reptime, shots=400): Shot repetion time in us. shots: int The number of shots + step: float + The step size for the amplitude tuning, default 0.02 + dim: int + The dimension of the amplitude axis, default 45 Returns ------- @@ -570,9 +580,13 @@ def tune_pulse(self, pulse, mode, freq, B , reptime, shots=400): elif hasattr(pulse, "final_freq") & hasattr(pulse, "BW"): c_frq = pulse.final_freq.value - 0.5*pulse.BW.value + freq elif hasattr(pulse, "init_freq") & hasattr(pulse, "final_freq"): - c_frq = 0.5*(pulse.final_freq.value + pulse.final_freq.value) + freq + c_frq = 0.5*(pulse.init_freq.value + pulse.final_freq.value) + freq # Find rect pulses + max_val = step * (dim -1) + if max_val > 1.0: + raise ValueError("Step and dim values result in scale > 1.0") + if mode == "amp_hahn": if pulse.flipangle.value == np.pi: tp = pulse.tp.value / 2 @@ -586,7 +600,7 @@ def tune_pulse(self, pulse, mode, freq, B , reptime, shots=400): pi2_pulse = pulse, pi_pulse=pi_pulse ) - scale = Parameter('scale',0,unit=None,step=0.02, dim=45, description='The amplitude of the pulse 0-1') + scale = Parameter('scale',0,unit=None,step=step, dim=dim, description='The amplitude of the pulse 0-1') amp_tune.pulses[0].scale = scale amp_tune.evolution([scale]) @@ -601,7 +615,7 @@ def tune_pulse(self, pulse, mode, freq, B , reptime, shots=400): new_amp = np.around(dataset.pulse0_scale[dataset.data.argmax()].data,2) if new_amp > 0.9: - raise RuntimeError("Not enough power avaliable.") + raise RuntimeError("Not enough power available.") if new_amp == 0: warnings.warn("Pulse tuned with a scale of zero!") @@ -626,7 +640,7 @@ def tune_pulse(self, pulse, mode, freq, B , reptime, shots=400): freq=c_frq-freq)) nut_tune.addPulse(Detection(t=3e3, tp=512, freq=c_frq-freq)) - scale = Parameter('scale',0,unit=None,step=0.02, dim=45, description='The amplitude of the pulse 0-1') + scale = Parameter('scale',0,unit=None,step=step, dim=dim, description='The amplitude of the pulse 0-1') nut_tune.pulses[0].scale = scale nut_tune.evolution([scale]) @@ -694,7 +708,7 @@ def tune(self,*, sequence=None, mode="amp_hahn", freq=None, gyro=None): dataset = self.read_dataset() scale = np.around(dataset.pulse0_scale[dataset.data.argmax()].data,2) if scale > 0.9: - raise RuntimeError("Not enough power avaliable.") + raise RuntimeError("Not enough power available.") self.pulses[f"p90_{tp}"] = amp_tune.pulses[0].copy( scale=scale, freq=0) @@ -803,6 +817,7 @@ def _build_pulse(self, pulse) -> dict: elif type(pulse) is ChirpPulse: event["pulsedef"]["type"] = 'chirp' + event["pulsedef"]["t_rise"] = float(pulse.rise_time.value) if hasattr(pulse, "init_freq"): event["pulsedef"]["nu_init"] = pulse.init_freq.value +\ @@ -1091,4 +1106,4 @@ def set_params_by_uuid(new_seq, progTable,uuid,index): interface.bg_data.attrs['diglevel'] = dig_level interface.bg_data.attrs['nAvgs'] = iavg+1 - print("Background thread finished") \ No newline at end of file + print("Background thread finished") diff --git a/pyepr/hardware/ETH_awg_load.py b/pyepr/hardware/ETH_awg_load.py index 39aed04..ef52432 100644 --- a/pyepr/hardware/ETH_awg_load.py +++ b/pyepr/hardware/ETH_awg_load.py @@ -2,7 +2,7 @@ import scipy.signal as sig from pyepr.classes import Parameter from pyepr.dataset import create_dataset_from_axes, create_dataset_from_sequence -from pyepr.pulses import Pulse +import pyepr.pulses as pulses from scipy.integrate import cumulative_trapezoid from deerlab import correctphase from warnings import warn @@ -26,7 +26,11 @@ def uwb_load(matfile: np.ndarray, options: dict = dict(), verbosity=0, # Extract Data estr = matfile[matfile['expname']] - conf = matfile['conf'] + if 'settings' in matfile.keys(): + conf = matfile['settings']['conf'] + else: + conf = matfile['conf'] + def extract_data(matfile): if "dta" in matfile.keys(): @@ -543,7 +547,7 @@ def uwb_eval_match(matfile, sequence=None, scans=None, mask=None,filter_pulse=No The scans to be loaded. mask : list, optional The mask to be used. - filter_pulse : ad.Pulse, optional + filter_pulse : epr.Pulse, optional The pulse to be used as a matched filter. If None, the maximum pulse width will be used. This is only used if filter_type is 'match' filter_type : str, optional The type of filter to be used. Options are 'match', 'cheby2' and 'butter. Default is 'match' @@ -559,7 +563,10 @@ def uwb_eval_match(matfile, sequence=None, scans=None, mask=None,filter_pulse=No # imports Andrin Doll AWG datafiles using a matched filter estr = matfile[matfile['expname']] - conf = matfile['conf'] + if 'settings' in matfile.keys(): + conf = matfile['settings']['conf'] + else: + conf = matfile['conf'] def extract_data(matfile,scans): if "dta" in matfile.keys() and not kwargs.get('ignore_dta',False): @@ -849,12 +856,12 @@ def extract_data(matfile,scans): filter_func = lambda dta, det_frq: match_filter_dc(dta,t,complex_shape,det_frq) - elif isinstance(filter_pulse,Pulse): + elif isinstance(filter_pulse,pulses.Pulse): complex_shape = filter_pulse.build_shape(t) filter_func = lambda dta, det_frq: match_filter_dc(dta,t,complex_shape,det_frq) - elif (filter_type.lower() == 'cheby2') or (filter_type.lower() == 'butter'): + elif filter_type.lower() in ['cheby2','butter','boxcar']: if filter_width is None: raise ValueError('You must provide a filter width for the cheby2 or butter filter') @@ -954,7 +961,180 @@ def scipy_filter_dc(dta,t,filter_width,offset_freq,sampling_freq,filter_type='ch filter_sos = sig.cheby2(10,40,(offset_freq-filter_width,offset_freq+filter_width),fs=sampling_freq,btype="bandpass",output='sos') elif filter_type == 'butter': filter_sos = sig.butter(10,(offset_freq-filter_width,offset_freq+filter_width),fs=sampling_freq,btype="bandpass",output='sos') + elif filter_type == 'boxcar': + # A boxcar is just a moving average, so we can implement it with a convolution + boxcar_len = int(2*filter_width*sampling_freq) + if boxcar_len % 2 == 0: + boxcar_len += 1 + boxcar = np.ones(boxcar_len)/boxcar_len + filtered = sig.convolve(dta,boxcar,mode='same') + filtered_dc = digitally_upconvert(t,filtered,-offset_freq) + return filtered_dc + filtered = sig.sosfilt(filter_sos,dta) filtered_dc = digitally_upconvert(t,filtered,-offset_freq) return filtered_dc +# --------------------------------------------------------------------------- +# ETH UWB data to xarray +# --------------------------------------------------------------------------- + +def extract_data(matfile,estr): + if "dta" in matfile.keys(): + nAvgs = matfile["nAvgs"] + dta = [matfile["dta"]] + + elif "dta_001" in matfile.keys(): + dta = [] + nAvgs = 0 + for ii in range(1, estr["avgs"]+1): + actname = 'dta_%03u' % ii + if actname in matfile.keys(): + single_scan = matfile[actname] + # Only keep it if the average is complete, unless it is + # the first + if np.sum(single_scan[..., -1]) == 0 and ii > 1: # incomplete scan + if (nAvgs+1) != ii: + print(f"Scan {ii} is incomplete and will be skipped.") + continue + elif np.sum(single_scan[..., -1]) == 0 and ii == 1: + nAvgs = 0 + else: + nAvgs += 1 + dta.append(single_scan) + dta = np.array(dta) + dta = np.atleast_2d(dta) + if dta.ndim > 2: + dta = np.swapaxes(dta,-1,-2) + return dta, nAvgs + + +def get_metadata(estr,conf): + metadata = {} + metadata['averages'] = estr['avgs'] + metadata['reptime'] = estr['reptime'] + metadata['shots'] = estr['shots'] + metadata['B'] = estr['B'] + metadata['name'] = estr['name'] + metadata['freq'] = estr['LO'] + for i,event in enumerate(estr['events']): + metadata.update(get_pulse_metadata(event,i)) + metadata['dig_rate'] = conf['std']['dig_rate'] + return metadata + +def detect_pulse_type(pulse): + if 'det_len' in pulse: + return pulses.Detection + if pulse['pulsedef']['type'] == 'chirp': + if 'nu_final' in pulse['pulsedef']: + return pulses.ChirpPulse + else: + return pulses.RectPulse + +def get_pulse_metadata(pulse,i): + pulse_type = detect_pulse_type(pulse) + fixed_params = {} + if issubclass(pulse_type, pulses.Detection): + type_str="det" + else: + type_str="pulse" + + fixed_params[f"{type_str}{i}_t"] = pulse['t'] + + if issubclass(pulse_type,pulses.Detection): + fixed_params[f"{type_str}{i}_tp"] = pulse['det_len'] + else: + fixed_params[f"{type_str}{i}_tp"] = pulse['pulsedef']['tp'] + fixed_params[f"{type_str}{i}_scale"] = pulse['pulsedef']['scale'] + + if issubclass(pulse_type,pulses.Detection): + fixed_params[f"{type_str}{i}_freq"] = pulse['det_frq'] + elif issubclass(pulse_type, pulses.FrequencySweptPulse): + fixed_params[f"{type_str}{i}_init_freq"] = pulse['pulsedef']['nu_init'] + fixed_params[f"{type_str}{i}_final_freq"] = pulse['pulsedef']['nu_final'] + else: # Monochromatic pulse + fixed_params[f"{type_str}{i}_freq"] = pulse['pulsedef']['nu_init'] + return fixed_params + +def get_dig_level(data,metadata,conf): + max_val = data.max() + dig_max = conf['std']['dig_max'] + acqs = metadata['shots'] * metadata['nAvgs']* metadata['nPcyc'] + dig_pc = max_val/(dig_max*acqs) + return dig_pc + +def get_nPcyc(estr): + nPcyc = 1 + if not isinstance(estr['parvars'],list): + estr['parvars'] = [estr['parvars']] + for parvar in estr['parvars']: + if 'reduce' in parvar and parvar['reduce'] == 1: + nPcyc *= parvar['axis'].size + return nPcyc + +default_labels = ['X','Y','Z','T'] +def get_coords(parvars): + coords = {} + i=-1 + if not isinstance(parvars,list): + parvars = [parvars] + for parvar in parvars: + if 'reduce' in parvar and parvar['reduce'] == 1: + continue + i += 1 + for variable in np.atleast_1d(parvar['variables']): + if '.' in variable: + event,var = variable.split('.') + event_num = int(event.split('{')[1].split('}')[0]) + + coord_name = f"pulse{event_num}_{var}" + else: + coord_name = f"{variable}" + coord_axis = np.array(parvar['axis']).astype(float) + coords[coord_name] = (default_labels[i],coord_axis) + + return coords + +def ETHUWB_xarray_load(matfile, sum_scans=True,): + """ + This function reads data from an Andrin Doll ETH UWB EPR spectrometer + and converts it into an xarray.Dataset format compatible with PyEPR. + + Parameters + ---------- + matfile : dict + The data file to be loaded. + sum_scans : bool, optional + Whether to sum all scans or not. Default is True. + Returns + ------- + output : xarray.Dataset + The data in the xarray format. + + Notes and Limitations + --------------------------- + + - This function currently only supports experiments with reduced phase cycles. + """ + estr = matfile[matfile['expname']] + conf = matfile['conf'] + + data,nAvgs = extract_data(matfile,estr) + metadata = get_metadata(estr,conf) + metadata['nPcyc'] = get_nPcyc(estr) + metadata['nAvgs'] = nAvgs + metadata['dig_pc'] = get_dig_level(data,metadata,conf) + data_shape = data.shape # [nAvgs, ..., tx] + axes_labels = default_labels[:len(data_shape)-2] + ['tx'] + base_axes = [np.arange(data_shape[i+1]) for i in range(len(data_shape)-1)] + + if sum_scans: + data = data.sum(axis=0) + # Add scan axis label + else: + axes_labels = ['scan'] + axes_labels + base_axes = [np.arange(nAvgs)] + base_axes + dataset = create_dataset_from_axes(data,base_axes,params=metadata, + extra_coords=get_coords(estr['parvars']), + axes_labels=axes_labels) + return dataset diff --git a/pyepr/hardware/PyEPR_control.py b/pyepr/hardware/PyEPR_control.py index 692eba5..f7c66bc 100644 --- a/pyepr/hardware/PyEPR_control.py +++ b/pyepr/hardware/PyEPR_control.py @@ -11,7 +11,7 @@ # PyEPR imports from pyepr.classes import Interface, Parameter from pyepr.pulses import Delay, Detection -from pyepr.sequences import Sequence, HahnEchoSequence +from pyepr.sequences import Sequence, HahnEchoSequence, FieldSweepSequence log = logging.getLogger("interface") @@ -33,11 +33,12 @@ def __init__(self,config_file_path:str): super().__init__() - self.IFgain_options = np.array([0, 20, 40]) - self.IFgain = 2 + self.IFgain_options = np.array([0,20,40]) #np.array([0, 20, 40]) + self.IFgain = 1 #2 self.config_file = config_file_path self.server = None + self.cur_exp = None @property def savefolder(self): @@ -88,12 +89,21 @@ def disconnect(self): return True def acquire_dataset(self,verbosity=0,sum_scans=True, **kwargs): + + if isinstance(self.cur_exp, FieldSweepSequence): + kwargs['filter_type'] = kwargs.pop('filter_type', 'boxcar') + kwargs['filter_width'] = kwargs.pop('filter_width', 250) + + else: + kwargs['filter_type'] = kwargs.get('filter_type', 'cheby') + kwargs['filter_width'] = kwargs.get('filter_width', 100) for i in range(60): args = kwargs.copy() - args['downconvert'] = True + if 'downconvert' not in args: + args['downconvert'] = True - response = requests.get(self.server + '/get_data', json={'downconvert': True}) + response = requests.get(self.server + '/get_data', json=args) if 'error' in response.json(): if verbosity > 0: log.warning(response.json()['error']) @@ -101,7 +111,9 @@ def acquire_dataset(self,verbosity=0,sum_scans=True, **kwargs): continue elif 'data' in response.json(): break - time.sleep(2) + else: + log.error(f"Unexpected response: {response.json()}") + time.sleep(2) if 'data' in response.json(): data = pickle.loads(response.json()['data'].encode('latin1')) #xr.datarray @@ -110,12 +122,15 @@ def acquire_dataset(self,verbosity=0,sum_scans=True, **kwargs): else: return data else: + log.error(f"No data found in response. {response.json()}") + raise RuntimeError("No data returned from server") def get_buffer(self,verbosity=0,sum_scans=True, **kwargs): for i in range(60): args = kwargs.copy() - args['downconvert'] = False + if 'downconvert' not in args: + args['downconvert'] = False response = requests.get(self.server + '/get_databuffer', json=args) if 'error' in response.json(): @@ -136,6 +151,25 @@ def get_buffer(self,verbosity=0,sum_scans=True, **kwargs): else: raise RuntimeError("No data returned from server") + def status(self): + """Returns the status of the spectrometer. + + Returns + ------- + dict + A dictionary containing the status of the spectrometer. + """ + response = requests.get(self.server + '/status') + if 'error' in response.json(): + raise RuntimeError(response.json()['error']) + else: + # Depickle + status = response.json() + data = pickle.loads(status['status'].encode('latin1')) + return data + + + def set_param(self, param: str, value: float): """Set a parameter for the spectrometer. @@ -158,14 +192,20 @@ def terminate(self): log.debug(response.json()['message']) return True - + def status(self): + try: + response = requests.get(self.server + "/status", timeout=10) + except Exception as e: + print(f"Error updating status: {e}") + return pickle.loads(response.json()['status'].encode('latin1')) + def launch(self, sequence: Sequence , savename: str, IFgain=None, *args,**kwargs): - + self.savefolder = self.savefolder # increase the detection length to a minimum 1024ns for pulse in sequence.pulses: if isinstance(pulse, Detection): - if pulse.tp.value < 128: - pulse.tp.value = 128 + if pulse.tp.value < 256: + pulse.tp.value = 256 if (IFgain is None) or (IFgain is True): test_IF = True @@ -213,7 +253,7 @@ def launch(self, sequence: Sequence , savename: str, IFgain=None, *args,**kwargs self._launch(sequence,savename,IFgain, *args,**kwargs) else: - raise ValueError(f"IFgain must be of type [None, bool, int, float]. {IFgain} is not valid.") + raise ValueError(f"IFgain must be of type [None, bool, int, float]. Type {type(IFgain)}_{IFgain} is not valid.") def _launch(self, sequence: Sequence , savename: str, IFgain=0,reset_cur_exp=True,*args,**kwargs): @@ -248,7 +288,7 @@ def isrunning(self) -> bool: return response.json()['isrunning'] - def tune_rectpulse(self,*,tp, freq, B, reptime, shots=400): + def tune_rectpulse(self,*,tp, freq, B, reptime, shots=400,tau=500,IFgain=None): """Generates a rectangular pi and pi/2 pulse of the given length at the given field position. This value is stored in the pulse cache. @@ -274,22 +314,31 @@ def tune_rectpulse(self,*,tp, freq, B, reptime, shots=400): """ amp_tune =HahnEchoSequence( - B=B, freq=freq, reptime=reptime, averages=1, shots=shots + B=B, freq=freq, reptime=reptime, averages=1, shots=shots, tau=tau ) - scale = Parameter("scale",0,dim=45,step=0.02) + scale = Parameter("scale",0.01,dim=45,step=0.02) amp_tune.pulses[0].tp.value = tp amp_tune.pulses[0].scale = scale amp_tune.pulses[1].tp.value = tp * 2 amp_tune.pulses[1].scale = scale + amp_tune.pulses[2].tp.value = 512 + amp_tune.evolution([scale]) - self.launch(amp_tune, "autoDEER_amptune") - + self.launch(amp_tune, "autoDEER_amptune",IFgain=IFgain) + time.sleep(15) while self.isrunning(): - time.sleep(10) - dataset = self.acquire_dataset() + time.sleep(2) + dataset = self.acquire_dataset(downconvert=True,reduce=True,filter_type='boxcar',filter_width=250) + # Check if zero's in data + if np.any(dataset.data == 0): + warnings.warn("Zero values found in dataset. This may indicate an error in acquisition.") + time.sleep(5) + while self.isrunning(): + time.sleep(2) + dataset = self.acquire_dataset(downconvert=True,reduce=True,filter_type='boxcar',filter_width=250) dataset = dataset.epr.correctphase data = np.abs(dataset.data) @@ -300,16 +349,17 @@ def tune_rectpulse(self,*,tp, freq, B, reptime, shots=400): if scale == 0: warnings.warn("Pulse tuned with a scale of zero!") + print("Pulse tuned with a scale of zero!") p90 = amp_tune.pulses[0].copy( - scale=scale, freq=amp_tune.freq) + scale=scale, freq=0) p180 = amp_tune.pulses[1].copy( - scale=scale, freq=amp_tune.freq) + scale=scale, freq=0) return p90, p180 - def tune_pulse(self, pulse, mode, freq, B , reptime, shots=400): + def tune_pulse(self, pulse, mode, freq, B , reptime, shots=400, IFgain=None,tau=500): """Tunes a single pulse a range of methods. Parameters @@ -346,7 +396,7 @@ def tune_pulse(self, pulse, mode, freq, B , reptime, shots=400): elif hasattr(pulse, "final_freq") & hasattr(pulse, "BW"): c_frq = pulse.final_freq.value - 0.5*pulse.BW.value + freq elif hasattr(pulse, "init_freq") & hasattr(pulse, "final_freq"): - c_frq = 0.5*(pulse.final_freq.value + pulse.final_freq.value) + freq + c_frq = 0.5*(pulse.init_freq.value + pulse.final_freq.value) + freq # Find rect pulses if mode == "amp_hahn": @@ -355,11 +405,11 @@ def tune_pulse(self, pulse, mode, freq, B , reptime, shots=400): elif pulse.flipangle.value == np.pi/2: tp = pulse.tp.value - pi2_pulse, pi_pulse = self.tune_rectpulse(tp=tp, B=B, freq=c_frq, reptime=reptime) + pi2_pulse, pi_pulse = self.tune_rectpulse(tp=tp, B=B, freq=c_frq, reptime=reptime, IFgain=IFgain,shots=shots) amp_tune =HahnEchoSequence( B=B, freq=freq, reptime=reptime, averages=1, shots=shots, - pi2_pulse = pulse, pi_pulse=pi_pulse + pi2_pulse = pulse, pi_pulse=pi_pulse,tau=tau ) scale = Parameter('scale',0,unit=None,step=0.02, dim=45, description='The amplitude of the pulse 0-1') @@ -367,7 +417,7 @@ def tune_pulse(self, pulse, mode, freq, B , reptime, shots=400): amp_tune.evolution([scale]) - self.launch(amp_tune, "autoDEER_amptune") + self.launch(amp_tune, "autoDEER_amptune", IFgain=IFgain) while self.isrunning(): time.sleep(10) @@ -386,35 +436,42 @@ def tune_pulse(self, pulse, mode, freq, B , reptime, shots=400): return pulse elif mode == "amp_nut": - pi2_pulse, pi_pulse = self.tune_rectpulse(tp=12, B=B, freq=c_frq, reptime=reptime) + + pi2_pulse, pi_pulse = self.tune_rectpulse(tp=16, B=(B/freq*c_frq), freq=c_frq, reptime=reptime, IFgain=IFgain,shots=shots) nut_tune = Sequence( - name="nut_tune", B=(B/freq*c_frq), freq=freq, reptime=reptime, + name="nut_tune", B=(B/freq*c_frq), freq=c_frq, reptime=reptime, averages=1,shots=shots ) - nut_tune.addPulse(pulse.copy( - t=0, pcyc={"phases":[0],"dets":[1]}, scale=0)) + f_shift = freq - c_frq + test_pulse = pulse.copy( + t=0, pcyc={"phases":[0],"dets":[1]}, scale=0) + if hasattr(test_pulse,"freq"): + test_pulse.freq.value = pulse.freq.value + f_shift + elif hasattr(test_pulse, "init_freq") & hasattr(pulse, "final_freq"): + test_pulse = pulse.init_freq.value + f_shift + test_pulse = pulse.final_freq.value - f_shift + nut_tune.addPulse(test_pulse) nut_tune.addPulse( - pi2_pulse.copy(t=2e3, - pcyc={"phases":[0, np.pi],"dets":[1, -1]}, - freq=c_frq-freq)) + pi2_pulse.copy(t=2000, + pcyc={"phases":[0],"dets":[1]}, + freq=0)) nut_tune.addPulse( - pi_pulse.copy(t=2.5e3, pcyc={"phases":[0],"dets":[1]}, - freq=c_frq-freq)) - nut_tune.addPulse(Detection(t=3e3, tp=512, freq=c_frq-freq)) + pi_pulse.copy(t=2000+tau, pcyc={"phases":[0],"dets":[1]}, + freq=0)) + nut_tune.addPulse(Detection(t=3000, tp=512, freq=c_frq-freq)) scale = Parameter('scale',0,unit=None,step=0.02, dim=45, description='The amplitude of the pulse 0-1') nut_tune.pulses[0].scale = scale nut_tune.evolution([scale]) - # nut_tune.addPulsesProg( # pulses=[0], # variables=["scale"], # axis_id = 0, # axis= np.arange(0,0.9,0.02) # ) - self.launch(nut_tune, "autoDEER_amptune") - + self.launch(nut_tune, "autoDEER_amptune", IFgain=IFgain) + time.sleep(10) while self.isrunning(): time.sleep(10) dataset = self.acquire_dataset() @@ -436,7 +493,10 @@ def tune_pulse(self, pulse, mode, freq, B , reptime, shots=400): pulse.scale = Parameter('scale',new_amp,unit=None,description='The amplitude of the pulse 0-1') return pulse - + + else: + raise ValueError(f"Mode {mode} not recognised. Available modes are: 'amp_hahn', 'amp_nut'") + def tune(self,*, sequence=None, mode="amp_hahn", freq=None, gyro=None): if mode == "rect_tune": @@ -527,3 +587,31 @@ def tune(self,*, sequence=None, mode="amp_hahn", freq=None, gyro=None): return sequence + def get_config(self): + """Returns the current configuration of the spectrometer.""" + try: + response = requests.get(f"{self.server}/get_config") + if response.status_code == 200: + config_pkl = response.json()['config'] + config = pickle.loads(config_pkl.encode('latin1')) + return config + except Exception as e: + print(f"Error getting config: {e}") + return None + + def set_config(self, config): + """Sets the configuration of the spectrometer. + + Parameters + ---------- + config : dict + The configuration to set. + """ + try: + config_pkl = pickle.dumps(config) + response = requests.post(f"{self.server}/set_config", data=config_pkl) + if response.status_code == 200: + return True + except Exception as e: + print(f"Error setting config: {e}") + return False diff --git a/pyepr/hardware/XeprAPI_link.py b/pyepr/hardware/XeprAPI_link.py index 074cafd..546c023 100644 --- a/pyepr/hardware/XeprAPI_link.py +++ b/pyepr/hardware/XeprAPI_link.py @@ -23,12 +23,25 @@ class XeprAPILink: - def __init__(self, config_file: str = None) -> None: + def __init__(self, config_file: str = None, **kwargs) -> None: + """ + This class links to the Bruker Xepr API to control the spectrometer. + Parameters + ---------- + config_file : str, optional + Path to a PyEPR spectrometer configuration file, by default None + kwargs : dict, optional + Additional keyword arguments: + - retry_attempts : int + Number of times to retry getting a Xepr parameter before + raising an exception. Default is 50. + """ self.Xepr = None self.cur_exp = None self.hidden = None self._tmp_dir = None self.XeprCmds = None + self.retry_attempts = kwargs.get('retry_attempts', 50) if config_file is not None: with open(config_file, mode='r') as file: config = yaml.safe_load(file) @@ -66,7 +79,7 @@ def connect(self) -> None: def log_xepr_version(self): """Logs the Xepr and Linacq versions for debugging purposes.""" - packages = ['xper','linacq'] + packages = ['xepr','linacq'] for package in packages: details = get_package_version_from_dnf(package) @@ -87,7 +100,7 @@ def _get_Xepr_global(self): def _xepr_retry(self, func, *args, **kwargs): - for i in range(0, 50): + for i in range(0, self.retry_attempts): try: return func(*args, **kwargs) except Exception as e: diff --git a/pyepr/hardware/dummy.py b/pyepr/hardware/dummy.py index a4bdd4f..e5cf50f 100644 --- a/pyepr/hardware/dummy.py +++ b/pyepr/hardware/dummy.py @@ -162,7 +162,7 @@ def __init__(self,config_file) -> None: self.mode = self.dummyResonator.mode - super().__init__(log=hw_log) + super().__init__(self.config,log=hw_log) def launch(self, sequence, savename: str, **kwargs): hw_log.info(f"Launching {sequence.name} sequence") diff --git a/pyepr/hardware/dummy_xepr.py b/pyepr/hardware/dummy_xepr.py index 546fa93..ae24b8f 100644 --- a/pyepr/hardware/dummy_xepr.py +++ b/pyepr/hardware/dummy_xepr.py @@ -232,7 +232,7 @@ def generate_phase_experiment(numpoints,pg,phase,phase_offset=0.2): t = range(0,numpoints) for i in t: ta,V = generate_hahn_echo_transient(pg,phase,phase_offset) - vals[i,] = np.trapz(V,x=ta) + vals[i,] = np.trapezoid(V,x=ta) return t, vals diff --git a/pyepr/pulses.py b/pyepr/pulses.py index 380a2a4..e9148e5 100644 --- a/pyepr/pulses.py +++ b/pyepr/pulses.py @@ -13,9 +13,7 @@ import copy from functools import reduce from itertools import accumulate -from numba import njit -#@njit def compute_upulses_not_trajectory(dUs): n_offsets, n_steps, _, _ = dUs.shape Upulses = np.empty((n_offsets, 2, 2), dtype=np.complex128) @@ -26,7 +24,6 @@ def compute_upulses_not_trajectory(dUs): Upulses[i] = U return Upulses -#@njit def compute_upulses_trajectory(dUs): n_offsets, n_steps, _, _ = dUs.shape Upulses = np.empty((n_offsets, n_steps + 1, 2, 2), dtype=np.complex128) @@ -39,7 +36,6 @@ def compute_upulses_trajectory(dUs): Upulses[i, j + 1] = U return Upulses -#@njit def compute_magnetization_not_trajectory(Upulses, density0, Mmag): n_offsets = Upulses.shape[0] density = np.empty((n_offsets, 2, 2), dtype=np.complex128) @@ -71,7 +67,6 @@ def compute_magnetization_not_trajectory(Upulses, density0, Mmag): return Mag * Mmag[:, None] -#@njit def compute_magnetization_trajectory(Upulses, density0): n_offsets, n_steps = Upulses.shape[:2] density = np.empty((n_offsets, n_steps, 2, 2), dtype=np.complex128) @@ -329,7 +324,7 @@ def flip(self): @property def amp_factor(self): """ The B1 amplitude factor (nutation frequency) for the pulse in GHz""" - amp_factor_value= self.flipangle.value / (2 * np.pi * np.trapz(self.AM,self.ax)) + amp_factor_value= self.flipangle.value / (2 * np.pi * np.trapezoid(self.AM,self.ax)) return Parameter("amp_factor", amp_factor_value, "GHz", "Amplitude factor for the pulse") @@ -341,8 +336,7 @@ def exciteprofile_old(self, freqs=None, resonator = None): This function is ported from EasySpin (https://easyspin.org/easyspin/documentation/sop.html) [1-2], - and based upon the method from Gunnar Jeschke, Stefan Pribitzer and - Andrin Doll[3]. + and based upon the method from Gunnar Jeschke, Stefan Pribitzer and Andrin Doll[3]. References: +++++++++++ @@ -478,13 +472,27 @@ def exciteprofile(self, freqs=None, resonator=None, trajectory=False): offsets = np.linspace(-2*BW, 2*BW, 100) + c_freq else: offsets = freqs - - ISignal = np.real(self.complex) * self.amp_factor.value - QSignal = np.imag(self.complex) * self.amp_factor.value + + if self.scale is None or self.scale.value is None: + scale=None + amp_factor = self.amp_factor.value + elif (self.scale.unit is not None) and (self.scale.unit.lower() in ['ghz','mhz']): + scale = 1.0 + if self.scale.unit.lower() == 'mhz': + amp_factor = self.scale.value/1000 + else: + amp_factor = self.scale.value + elif self.scale is not None: + scale = self.scale.value + amp_factor = self.amp_factor.value + else: + scale=None + amp_factor = self.amp_factor.value if resonator is not None: + # Apply resonator correction to the pulse AM function FM = self.FM - if self.scale is None or self.scale.value is None: + if scale is None: amp_factor = np.interp(FM, resonator.freqs-resonator.freq_c, resonator.profile) amp_factor = np.min([amp_factor,np.ones_like(amp_factor)*self.amp_factor.value],axis=0) else: @@ -496,6 +504,12 @@ def exciteprofile(self, freqs=None, resonator=None, trajectory=False): ISignal = np.real(self.complex) * amp_factor QSignal = np.imag(self.complex) * amp_factor + + else: + # No resonator correction so use a constant amplitude + ISignal = np.real(self.complex) * amp_factor # In GHz + QSignal = np.imag(self.complex) * amp_factor # In GHz + t= self.ax M0=[0, 0, 1] @@ -610,7 +624,7 @@ def _pcyc_str(self): def __str__(self): # Build header line - header = "#" * 79 + "\n" + "autoDEER Pulse Definition" + \ + header = "#" * 79 + "\n" + "PyEPR Pulse Definition" + \ "\n" + "#" * 79 + "\n" # Build Overviews @@ -685,7 +699,7 @@ def __str__(self): # Build Footer footer = "#" * 79 + "\n" +\ - f"Built by autoDEER Version: {__version__}" + "\n" + "#" * 79 + f"Built by PyEPR Version: {__version__}" + "\n" + "#" * 79 # Combine All string = header + overview_params + param_table + prog_table +\ @@ -964,20 +978,41 @@ class GaussianPulse(Pulse): Represents a Gaussian monochromatic pulse. """ - def __init__(self, *, tp=32,FWHM=16, freq=0, **kwargs) -> None: - """ Represents a Gaussian monochromatic pulse. + def __init__(self, *, tp=32,FWHM=None, freq=0, **kwargs) -> None: + """ + By default, the FWHM is set to tp/(2*np.sqrt(2*np.log(2))). + Parameters ---------- tp : float - Pulse length in ns, by default 128 + Pulse length in ns, by default 32 FWHM : float, - The full width at half maximum of the pulse + The full width at half maximum of the pulse. Defaults to tp/(2*np.sqrt(2*np.log(2))). freq : float, optional The frequency of the pulse, by default 0 + + + Examples + -------- + + .. plot:: + :include-source: + + import pyepr as epr + import matplotlib.pyplot as plt + + GuassPulse = epr.GaussianPulse(tp=32) + GuassPulse.plot(pad=0) + plt.annotate('', xy=(-GuassPulse.FWHM.value/2, 0.5), xytext=(GuassPulse.FWHM.value/2, 0.5), color='C1',arrowprops=dict(arrowstyle='<|-|>', color='C1',lw=2)) + plt.annotate('FWHM', xy=(0, 0.51), xytext=(0, 0.51), color='C1', ha='center', fontsize=12) """ Pulse.__init__(self, tp=tp,name='GaussianPulse', **kwargs) self.freq = Parameter("Freq", freq, "GHz", "Frequency of the Pulse") + + if FWHM is None: + FWHM = tp / (2 * np.sqrt(2 * np.log(2))) + self.FWHM = Parameter("FWHM", FWHM, "ns", "Full Width at Half Maximum of the Pulse") self._buildFMAM(self.func) pass @@ -988,6 +1023,9 @@ def func(self, ax): AM = np.exp(-ax**2 / (2 * sigma**2)) FM = np.zeros(nx) + self.freq.value return AM, FM + + def plot(self, pad=1000): + return super().plot(pad) # ============================================================================= class FrequencySweptPulse(Pulse): """ @@ -1008,8 +1046,12 @@ def __init__(self, *, tp, t=None, scale=None, flipangle=None, pcyc=[0], name=Non elif "final_freq" in kwargs: self.final_freq = Parameter("final_freq", kwargs["final_freq"], "GHz", "Final frequency of pulse") self.init_freq = Parameter("init_freq", self.final_freq.value - BW, "GHz", "Initial frequency of pulse") + elif "freq" in kwargs: + # Assumes symmetric sweep about central frequency + self.init_freq = Parameter("init_freq", kwargs["freq"] - BW/2, "GHz", "Initial frequency of pulse") + self.final_freq = Parameter("final_freq", kwargs["freq"] + BW/2, "GHz", "Final frequency of pulse") else: - raise ValueError("Bandwidth must be combined with either an initial or final frequency") + raise ValueError("Bandwidth must be combined with either an initial, final or center frequency") elif ("init_freq" in kwargs) & ("final_freq" in kwargs): self.init_freq = Parameter("init_freq", kwargs["init_freq"], "GHz", "Initial frequency of pulse") self.final_freq = Parameter("final_freq", kwargs["final_freq"], "GHz", "Final frequency of pulse") @@ -1078,7 +1120,7 @@ def func(self, ax): # beta**beta_exp2 * (ax[cut:-1]/tp)**order2) # FM = BW * cumulative_trapezoid(AM**2,ax,initial=0) /\ - # np.trapz(AM**2,ax) + self.init_freq.value + # np.trapezoid(AM**2,ax) + self.init_freq.value sech = lambda x: 1/np.cosh(x) cut = round(nx/2) AM = np.zeros_like(ti) @@ -1100,6 +1142,8 @@ def sweeprate(self): """ The sweep rate of the pulse in GHz/ns""" sweeprate_value = self.beta.value * self.bandwidth.value / (2*self.tp.value) return Parameter("sweeprate", sweeprate_value, "GHz/ns", "Sweep rate of the pulse") + + # ============================================================================= @@ -1108,20 +1152,28 @@ class ChirpPulse(FrequencySweptPulse): Represents a linear frequency-swept pulse. """ - def __init__(self, *, tp=128, **kwargs) -> None: + def __init__(self, *, tp=128,rise_time=10, **kwargs) -> None: FrequencySweptPulse.__init__(self, tp=tp,name='ChirpPulse', **kwargs) + self.rise_time = Parameter("rise_time", rise_time, "ns", "The rise time of the pulse") self._buildFMAM(self.func) pass def func(self, ax): nx = ax.shape[0] + # Use a quarter sine wave for the rise and fall + rise_pts = int(self.rise_time.value/(ax[1]-ax[0])) + if rise_pts > nx/2: + rise_pts = int(nx/2) AM = np.ones(nx) + AM[0:rise_pts] = np.sin((np.pi/2)*(ax[0:rise_pts]/self.rise_time.value))**2 + AM[-rise_pts:] = np.sin((np.pi/2)*((self.tp.value-ax[-rise_pts:])/self.rise_time.value))**2 FM = np.linspace( self.init_freq.value, self.final_freq.value, nx) return AM, FM + @property def sweeprate(self): diff --git a/pyepr/relaxation_analysis.py b/pyepr/relaxation_analysis.py index 93d0b17..fd3a2b0 100644 --- a/pyepr/relaxation_analysis.py +++ b/pyepr/relaxation_analysis.py @@ -10,7 +10,7 @@ from pyepr.colors import primary_colors # =========================================================================== - +# T2-type relaxation analysis classes class CarrPurcellAnalysis(): def __init__(self, dataset, sequence: Sequence = None) -> None: @@ -485,7 +485,9 @@ def __call__(self, x, norm=True, SNR=False, source=None): return V[0] else: return V - + +# T1-type relaxation analysis classes + class ReptimeAnalysis(): def __init__(self, dataset, sequence: Sequence = None) -> None: diff --git a/pyepr/resonator_profile_analysis.py b/pyepr/resonator_profile_analysis.py index 5a550b2..734f868 100644 --- a/pyepr/resonator_profile_analysis.py +++ b/pyepr/resonator_profile_analysis.py @@ -40,7 +40,7 @@ def __init__( Fitting fuction: - .. math:: \\nu(t) = a \cos(2\pi f (t - x_0)) e^{-(t - x_0)/\tau} + k + .. math:: \\nu(t) = a \\cos(2\\pi f (t - x_0)) e^{-(t - x_0)/\\tau} + k where :math:`a` is the amplitude, :math:`f` is the nutation frequency, :math:`\\tau` is the decay time, :math:`x_0` is the offset and :math:`k` is a constant offset. @@ -64,8 +64,17 @@ def __init__( p0 : list, optional The initial guess for the fit, by default None. If not given the guess is set to [50e-3,150,1,0] """ + # Rotate the dataset so that first axis is 'pulse0_tp' and second is 'LO' + + + tp_dim = dataset.coords['pulse0_tp'].dims[0] + B_dim = dataset.coords['B'].dims[0] + if tp_dim != dataset.dims[0]: + dataset = dataset.transpose(tp_dim,B_dim, transpose_coords=True) + + - if np.iscomplexobj(dataset): + if np.iscomplexobj(dataset) and not kwargs.get('skip_phase_correction',False): self.dataset = phase_correct_respro(dataset) else: self.dataset = dataset @@ -150,12 +159,12 @@ def process_nutations( return self.prof_data, self.prof_frqs - def _process_fit(self,R_limit=0.5,mask=None,debug=False): + def _process_fit(self,R_limit=0.5,mask=None,debug=False,**kwargs): """Processes the nutation data by fitting a cosine function to each frequency in the dataset. Function used for fitting: - .. math:: \\nu(t) = a \cos(2\pi f (t - x_0)) e^{-(t - x_0)/\tau} + k + .. math:: \\nu(t) = a \\cos(2\\pi f (t - x_0)) e^{-(t - x_0)/\\tau} + k where :math:`a` is the amplitude, :math:`f` is the nutation frequency, :math:`\\tau` is the decay time, :math:`x_0` is the offset and :math:`k` is a constant offset. @@ -220,6 +229,7 @@ def _process_fit(self,R_limit=0.5,mask=None,debug=False): self.profile_ci[i] = np.nan # Remove nan + old_freqs=self.freqs self.freqs = self.freqs[~np.isnan(self.profile)] self.profile = self.profile[~np.isnan(self.profile)] self.profile_ci = self.profile_ci[~np.isnan(self.profile_ci)] @@ -242,7 +252,7 @@ def _process_fit(self,R_limit=0.5,mask=None,debug=False): for i in range(n_excluded): key = int(list(excluded.keys())[i]) nutation = self.dataset[:,key] - freq = self.freqs[key].values + freq = old_freqs[key].values if np.iscomplexobj(nutation): nutation = nutation.real nutation = nutation/np.max(nutation) @@ -581,11 +591,11 @@ def calc_overlap(x, func1, func2): """ y1 = func1(x) y2 = func2(x) - area_1 = np.trapz(y1, x) - area_2 = np.trapz(y2, x) + area_1 = np.trapezoid(y1, x) + area_2 = np.trapezoid(y2, x) y1 /= area_1 y2 /= area_2 - area_12 = np.trapz(y1*y2, x) + area_12 = np.trapezoid(y1*y2, x) return area_12 def BSpline_extra(tck_s): @@ -633,3 +643,86 @@ def optimise_spectra_position(resonator_profile, fieldsweep, verbosity=0): return new_LO + +class AmplifierLinearityAnalysis: + + def __init__(self, dataset,attenuator=0) -> None: + self.dataset = dataset + self.attenuator = attenuator + + if "pulse0_scale" in self.dataset.coords: + self.scales= self.dataset.pulse0_scale + + if "pulse0_tp" in self.dataset.coords: + self.tp = self.dataset.pulse0_tp + + self._process_fit() + + + def _process_fit(self,R_limit=0.5,mask=None): + self.n_scales = self.scales.shape[0] + + self.profile = np.zeros(self.n_scales) + self.profile_ci = np.zeros(self.n_scales) + + fun = lambda x, f, tau,a,k: a*np.cos(2*np.pi*f*x)*np.exp(-x/tau)+ k + bounds = ([5e-3,10,0,-1],[0.3,2000,2,1]) + p0 = [50e-3,150,1,0] + + R2 = lambda y, yhat: 1 - np.sum((y - yhat)**2) / np.sum((y - np.mean(y))**2) + + for i in range(self.n_scales): + if mask is not None: + nutation = self.dataset[mask,i] + x = self.tp[mask] + else: + nutation = self.dataset[:,i] + x = self.tp + if np.iscomplexobj(nutation): + nutation = nutation.epr.correctphase + nutation = nutation/np.max(nutation) + + try: + results = curve_fit(fun, x, nutation, bounds=bounds,xtol=1e-4,ftol=1e-4,p0=p0) + if R2(nutation,fun(x,*results[0])) > R_limit: + self.profile[i] = results[0][0] + + self.profile_ci[i] = np.sqrt(np.diag(results[1]))[0]*1.96 + else: + self.profile[i] = np.nan + self.profile_ci[i] = np.nan + except RuntimeError: + self.profile[i] = np.nan + self.profile_ci[i] = np.nan + + # Remove nan + self.scales = self.scales[~np.isnan(self.profile)] + self.profile = self.profile[~np.isnan(self.profile)] + self.profile_ci = self.profile_ci[~np.isnan(self.profile_ci)] + + # Adjust for attenuator + self.profile = self.profile * 10**(self.attenuator/20) + self.profile_ci = self.profile_ci * 10**(self.attenuator/20) + + def fit(self,order=5): + """ 5th order polynomial fit to the profile """ + if hasattr(self, 'profile'): + self.coefficients = np.polyfit(self.scales, self.profile/self.profile.max(), order) + self.model = np.polyval(self.coefficients, self.scales) + else: + raise ValueError("Profile not calculated. Run _process_fit() first.") + + def plot(self, axs= None, fig=None): + + if axs is None and fig is None: + fig, axs = plt.subplots(1,1,constrained_layout=True,figsize=(8,8)) + + axs.plot(self.scales, self.profile*1e3, label="Profile", color=primary_colors[0], marker='o', markersize=5, linewidth=0,alpha=0.7) + + if hasattr(self, 'model'): + axs.plot(self.scales, self.model*1e3*self.profile.max(), label="Model", color=primary_colors[0], linewidth=2) + axs.text(0.05, 0.95, f"Fit Coeff: {[f'{i:.3g}' for i in self.coefficients]}", transform=axs.transAxes, fontsize=10, verticalalignment='top',bbox=dict(facecolor='white', alpha=0.5, edgecolor='black')) + + axs.legend() + axs.set_xlabel("Pulse amplitude / A.U.") + axs.set_ylabel("Nutation frequency / MHz") \ No newline at end of file diff --git a/pyepr/sequences.py b/pyepr/sequences.py index 55b7f74..6899f7e 100644 --- a/pyepr/sequences.py +++ b/pyepr/sequences.py @@ -78,11 +78,11 @@ def __init__( "The shot repetition time") self.averages = Parameter( - "averages", averages, "None", + "averages", int(averages), "None", "The number of averages to perform.") self.shots = Parameter( - "shots", shots, "None", + "shots", int(shots), "None", "The number of shots per scan.") if "det_window" in kwargs: @@ -363,7 +363,15 @@ def shape(self): else: axes_dim = [1] - return [nAvgs]+ axes_dim +[nPcyc] + # Flip the axes dim so it is Z,Y,X... + axes_dim = axes_dim[::-1] + + shape = [nAvgs]+ axes_dim +[nPcyc] + + # Force list output to all be int + shape = np.array(shape).astype(int).tolist() + + return shape @@ -646,7 +654,7 @@ def __init__(self, *, B, freq, reptime, averages, shots, **kwargs) -> None: Parameters ---------- B : int or float - The B0 field, in Guass + The B0 field, in Gauss freq : int or float The freq frequency in GHz reptime : _type_ @@ -714,7 +722,7 @@ def __init__(self, *, B, freq, reptime, averages, shots, **kwargs) -> None: if hasattr(self, "det_event"): self.addPulse(self.det_event.copy(t=2*tau)) else: - self.addPulse(Detection(t=2*tau, tp=self.det_window.value)) + self.addPulse(Detection(t=2*tau, tp=512)) # ============================================================================= @@ -723,12 +731,12 @@ class T1InversionRecoverySequence(Sequence): Represents a T1 Inversion Recovery Sequence. Sequence: - [\pi - \tau - \pi/2 - \tau - echo] + [\pi - t - \pi/2 - \tau - \pi - \tau - echo] Parameters ---------- B : int or float - The B0 field, in Guass + The B0 field, in Gauss LO : int or float The LO frequency in GHz reptime : _type_ @@ -743,6 +751,8 @@ class T1InversionRecoverySequence(Sequence): The step size of the interpulse delay in ns, by default 50 ns dim : int The number of points in the X axis + tau: float + The Hahn echo interpulse delay in ns, by default 500 ns Optional Parameters ------------------- @@ -753,6 +763,62 @@ class T1InversionRecoverySequence(Sequence): An autoEPR Pulse object describing the refocusing pi pulses. If not specified a RectPulse will be created instead. """ + + def __init__(self, *, B, freq, reptime, averages, shots,start=500, step=40, dim=200, **kwargs) -> None: + name = "T1InversionRecoverySequence" + tp = kwargs.get("tp", 12) + + self.tau = Parameter(name="tau", value=500, unit="ns", description="The hahn ehco interpulse delay",virtual=True) + self.t = Parameter(name="t", value=start, step=step, dim=dim, unit="ns", description="The inversion recovery delay",virtual=True) + super().__init__(name=name,B=B, freq=freq, reptime=reptime, averages=averages, shots=shots, **kwargs) + + if "pi_pulse" in kwargs: + self.pi_pulse = kwargs["pi_pulse"] + if "pi2_pulse" in kwargs: + self.pi2_pulse = kwargs["pi2_pulse"] + if "det_event" in kwargs: + self.det_event = kwargs["det_event"] + + if hasattr(self, "pi_pulse"): + pi_pulse = self.addPulse(self.pi_pulse.copy( + t=0, pcyc={"phases":[0], "dets": [1]})) + else: + pi_pulse = self.addPulse(RectPulse( # Pump 1 pulse + t=0, tp=tp, freq=0, flipangle=np.pi + )) + + if hasattr(self, "pi2_pulse"): + self.addPulse(self.pi2_pulse.copy( + t=self.t, pcyc={"phases":[0, np.pi], "dets": [1, -1]})) + else: + self.addPulse(RectPulse( # Exc pulse + t=self.t, tp=tp, freq=0, flipangle=np.pi/2, + pcyc={"phases":[0, np.pi], "dets":[1, -1]} + )) + + if hasattr(self, "pi_pulse"): + pi_pulse = self.addPulse(self.pi_pulse.copy( + t=self.t+self.tau, pcyc={"phases":[0], "dets": [1]})) + else: + pi_pulse = self.addPulse(RectPulse( # Pump 1 pulse + t=self.t+self.tau, tp=tp, freq=0, flipangle=np.pi + )) + if hasattr(self, "det_event"): + self.addPulse(self.det_event.copy(t=self.t+2*self.tau)) + else: + self.addPulse(Detection(t=self.t+2*self.tau, tp=self.det_window.value)) + + self.evolution([self.t]) + + def simulate(self, T1=1e4): + """ + Simulates the T1 recovery as a simple exponential recovery. + """ + func = lambda x, a, T1: a*(1 - 2*np.exp(-x/T1)) + xaxis = val_in_ns(self.t) + data = func(xaxis,1,T1) + data = add_phaseshift(data, 0.05) + return xaxis, data # ============================================================================= @@ -763,7 +829,7 @@ class HahnEchoRelaxationSequence(HahnEchoSequence): Parameters ---------- B : int or float - The B0 field, in Guass + The B0 field, in Gauss freq : int or float The freq frequency in GHz reptime : _type_ @@ -814,7 +880,7 @@ class T2RelaxationSequence(HahnEchoRelaxationSequence): Parameters ---------- B : int or float - The B0 field, in Guass + The B0 field, in Gauss freq : int or float The freq frequency in GHz reptime : _type_ @@ -854,7 +920,7 @@ def __init__(self, *, B, freq, Bwidth, reptime, averages, shots, **kwargs) -> No Parameters ---------- B : int or float - The B0 field, in Guass + The B0 field, in Gauss Bwidth: int or float The width of the field sweep, in Gauss freq : int or float @@ -926,7 +992,7 @@ def __init__(self, *, B, freq, reptime, reptime_max, averages, shots, start=20, Parameters ---------- B : int or float - The B0 field, in Guass + The B0 field, in Gauss freq : int or float The freq frequency in GHz reptime: float @@ -957,7 +1023,7 @@ def __init__(self, *, B, freq, reptime, reptime_max, averages, shots, start=20, step = np.around(step,decimals=-1) step = np.around(step,decimals=-1) reptime = Parameter( - "reptime", reptime, start = min_reptime-reptime, step=step, dim=100, unit="us", + "reptime", reptime, start = min_reptime-reptime, step=step, dim=dim, unit="us", description = "The shot repetition time") super().__init__( @@ -1004,7 +1070,7 @@ def __init__(self, *, B, freq, reptime, averages, shots, Parameters ---------- B : int or float - The B0 field, in Guass + The B0 field, in Gauss freq : int or float The freq frequency in GHz reptime : _type_ @@ -1138,7 +1204,7 @@ def __init__(self, *, B, freq, reptime, averages, shots, fwidth=0.3,dtp=2, **kwa Parameters ---------- B : int or float - The B0 field, in Guass + The B0 field, in Gauss Bwidth: int or float The width of the field sweep, in Gauss freq : int or float @@ -1208,7 +1274,7 @@ def _build_sequence(self): self.freq = Parameter("freq", center_freq, start=-fwidth, step=fstep, dim=dim, unit="GHz", description="frequency") self.B = Parameter( "B",((center_freq)/self.gyro), start=-fwidth/self.gyro, step=fstep/self.gyro, dim=dim, - unit="Guass",link=self.freq,description="B0 Field" ) + unit="Gauss",link=self.freq,description="B0 Field" ) self.addPulse(RectPulse( # Hard pulse t=0, tp=tp, freq=0, flipangle="Hard" @@ -1231,7 +1297,7 @@ def _build_sequence(self): t=tau1+tau2, tp=32, freq=0, flipangle=np.pi )) - self.addPulse(Detection(t=tau1+2*tau2, tp=64)) + self.addPulse(Detection(t=tau1+2*tau2, tp=512)) self.pulses[0].scale.value = 1 diff --git a/pyepr/tools.py b/pyepr/tools.py index 0e19fcf..01ccea3 100644 --- a/pyepr/tools.py +++ b/pyepr/tools.py @@ -2,8 +2,8 @@ import deerlab as dl import numpy as np import logging -from pyepr.dataset import create_dataset_from_bruker -from pyepr.hardware.ETH_awg_load import uwb_load, uwb_eval_match +from pyepr.dataset import create_dataset_from_bruker, downconvert_dataset +from pyepr.hardware.ETH_awg_load import uwb_load, uwb_eval_match, ETHUWB_xarray_load from scipy.io import loadmat from scipy.io.matlab import MatReadError import xarray as xr @@ -12,7 +12,7 @@ def eprload( - path: str, experiment: str = None, type: str = None, + path: str, experiment: str = None, type: str = None,downconvert=True, **kwargs): """ A general versions of eprload @@ -54,7 +54,7 @@ def eprload( type = 'TXT' elif file_extension == '.mat': - log.debug('File detecetd as Matlab') + log.debug('File detected as Matlab') type = 'MAT' else: @@ -64,10 +64,10 @@ def eprload( " set type manually \n Valid file types: '.DSC','.DTA','.h5'," "'.hdf5','.csv','.txt','.mat'") - if type == 'BRUKER': + if type.upper() == 'BRUKER': return create_dataset_from_bruker(path) - elif type == 'TXT': + elif type.upper() == 'TXT': if 'full_output' in kwargs: full_output = kwargs['full_output'] del kwargs['full_output'] @@ -76,7 +76,22 @@ def eprload( data = np.loadtxt(path, *kwargs) return data - elif type == 'MAT': + elif type.upper() == 'MAT': + try: + Matfile = loadmat(path, simplify_cells=True, squeeze_me=True) + except Exception as e: + raise MatReadError("Error opening MatFile") + + dataset = ETHUWB_xarray_load(Matfile,kwargs.get('sum_scans',True)) + + if downconvert and 'tx' in dataset.dims: + datasetDC = downconvert_dataset(dataset,**kwargs) + return datasetDC + else: + return dataset + + + elif type.upper() == 'MAT_OLD': try: Matfile = loadmat(path, simplify_cells=True, squeeze_me=True) except Exception as e: @@ -97,8 +112,14 @@ def eprload( return uwb_output - elif type == 'HDF5': - return xr.load_dataarray(path,engine='h5netcdf',invalid_netcdf=True) + elif type.upper() == 'HDF5': + dataset= xr.load_dataarray(path,engine='h5netcdf',invalid_netcdf=True) + + if downconvert and 'tx' in dataset.dims: + datasetDC = downconvert_dataset(dataset,**kwargs) + return datasetDC + else: + return dataset def progress_bar(progress, post=""): diff --git a/pyepr/utils.py b/pyepr/utils.py index 7189809..98e8d68 100644 --- a/pyepr/utils.py +++ b/pyepr/utils.py @@ -157,7 +157,7 @@ def autoEPRDecoder(dct): def gcd(values:list): - """Generates the greatest common dividor on a list of floats + """Generates the greatest common divisor on a list of floats Parameters ---------- diff --git a/pyproject.toml b/pyproject.toml index bbd42c1..5495370 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pyEPR-ESR" -version = "1.0.2" +version = "1.1" description = "" authors = ["Hugo Karas ", "Gunnar Jeschke ", "Stefan Stoll "] readme = "README.md" @@ -9,17 +9,18 @@ packages = [ ] [tool.poetry.dependencies] -python = ">=3.11,<3.13" -numpy = ">2.1" -scipy = "^1.14.1" -matplotlib = "^3.9.2" +python = ">=3.11,<4.0" +numpy = ">=2.1,<3.0" +scipy = ">=1.14.1" +matplotlib = ">=3.9.2" pyyaml = "^6.0.2" xarray = ">=2025" -h5netcdf = "^1.4.0" -toml = "^0.10.2" -deerlab = "^1.1.4" -numba = ">=0.60.0" -psutil = "^7.1.3" +h5py = ">3.15" +h5netcdf = ">1.4.0" +toml = ">=0.10.2" +deerlab = ">=1.1.4,<2.0" +psutil = ">=7.1.3" +requests = ">=2.32" [tool.poetry.group.dev.dependencies] @@ -28,19 +29,20 @@ pytest = "^8.3.3" [tool.poetry.group.doc.dependencies] -furo = {version = "^2024.8.6"} -autoapi = {version = "^2.0.1"} -sphinx = {version = "^8.1.3"} -sphinx-autoapi = "^3.3.3" -sphinx-toolbox = "^3.8.1" -sphinx-copybutton = "^0.5.2" -numpydoc = "^1.8.0" -sphinx-favicon = "^1.0.1" -sphinx-autobuild = "^2025.8.25" -sphinx-gallery = "^0.19.0" -myst-parser = "^4.0.1" -sphinx-design = "^0.6.1" +furo = {version = ">=2024.8.6"} +autoapi = {version = ">=2.0.1"} +sphinx = {version = ">=8.1.3"} +sphinx-autoapi = ">=3.3.3" +sphinx-toolbox = ">=3.8.1" +sphinx-copybutton = ">=0.5.2" +numpydoc = ">=1.8.0" +sphinx-favicon = ">=1.0.1" +sphinx-autobuild = ">=2025.8.25" +sphinx-gallery = ">=0.19.0" +myst-parser = ">=4.0.1" +sphinx-design = ">=0.6.1" [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" + diff --git a/tests/test_pulses.py b/tests/test_pulses.py new file mode 100644 index 0000000..555d196 --- /dev/null +++ b/tests/test_pulses.py @@ -0,0 +1,72 @@ +from autodeer.pulses import * +import pytest + +def test_pulse_init(): + pulse = Pulse(tp=10, scale=0.5) + assert pulse.tp.value == 10 + assert pulse.scale.value == 0.5 + +def test_pulse_add_phase_cycle(): + pulse = Pulse(tp=10, scale=0.5) + pulse._addPhaseCycle([0, 90, 180, 270]) + assert pulse.pcyc["Phases"] == [0, 90, 180, 270] + assert pulse.pcyc["DetSigns"] == [1, 1, 1, 1] + + +def test_pulse_is_static(): + pulse = Pulse(tp=10, scale=0.5) + assert pulse.is_static() + pulse = Pulse(tp=10, scale=0.5, t=0) + assert pulse.is_static() + pulse = Pulse(tp=10, scale=0.5, t=0, pcyc=None) + assert pulse.is_static() + tp = Parameter( + name="tp", value=10, unit="us", description="The pulse length", + axis=np.arange(0,10,1), axis_id=0) + pulse = Pulse(tp=tp, scale=0.5, t=0, pcyc=None) + assert pulse.is_static() + +def test_pulse_plot(): + pulse = Pulse(tp=10, scale=0.5) + pulse._buildFMAM(lambda x: (np.ones_like(x), np.zeros_like(x))) + pulse.plot() + +def test_pulse_exciteprofile(): + pulse = Pulse(tp=10, scale=0.5, flipangle=np.pi/2) + pulse.func = lambda x: (np.ones_like(x), np.zeros_like(x)) + freqs = np.linspace(-.25, .25, 1001) + Mag= pulse.exciteprofile(freqs=freqs) + assert len(Mag[:,0]) == 1001 + assert len(Mag[:,1]) == 1001 + assert len(Mag[:,2]) == 1001 + +def test_rectpulse_init(): + pulse = RectPulse(tp=10, freq=1, t=0, flipangle=np.pi/2) + assert pulse.tp.value == 10 + assert pulse.freq.value == 1 + assert pulse.t.value == 0 + assert pulse.flipangle.value == np.pi/2 + +def test_rectpulse_func(): + pulse = RectPulse(tp=10, freq=1, t=0, flipangle=np.pi/2) + AM, FM = pulse.func(np.arange(10)) + assert np.allclose(AM, np.ones(10)) + assert np.allclose(FM, np.ones(10) ) + +def test_hspulse_init(): + pulse = HSPulse(tp=10, order1=1, order2=6, beta=20, BW=1, init_freq=0) + assert pulse.tp.value == 10 + assert pulse.order1.value == 1 + assert pulse.order2.value == 6 + assert pulse.beta.value == 20 + assert pulse.BW.value == 1 + assert pulse.init_freq.value == 0 + assert pulse.final_freq.value == 1 + +def test_hspulse_func(): + pulse = HSPulse(tp=128, order1=1, order2=6, beta=20, BW=0.1, init_freq=0) + AM, FM = pulse.func(np.arange(10)) + # assert np.allclose(AM, np.ones(10)) + assert np.allclose(FM.min(), 0) + assert np.allclose(FM.max(), 0.1) + diff --git a/tests/test_respro.py b/tests/test_respro.py new file mode 100644 index 0000000..3885568 --- /dev/null +++ b/tests/test_respro.py @@ -0,0 +1,134 @@ +from autodeer.ResPro import * +from autodeer.hardware.dummy import _similate_respro +from autodeer.sequences import ResonatorProfileSequence +from autodeer.dataset import create_dataset_from_sequence, create_dataset_from_axes +from autodeer.FieldSweep import create_Nmodel, FieldSweepAnalysis +import pytest + +def test_ResonatorProfileAnalysis_from_sim(): + def lorenz_fcn(x, centre, sigma): + y = (0.5*sigma)/((x-centre)**2 + (0.5*sigma)**2) + return y + + mode = lambda x: lorenz_fcn(x, 34.0, 34.0/60) + x = np.linspace(33,35) + scale = 75/mode(x).max() + mode_fun = lambda x: lorenz_fcn(x, 34.0, 34.0/60) * scale + seq = ResonatorProfileSequence(B=12220,LO=34,reptime=3e3,averages=1,shots=50,fwidth=0.3) + + + [tp_x, LO_axis], data = _similate_respro(seq,mode_fun) + + dset = create_dataset_from_sequence(data,seq) + + respro = ResonatorProfileAnalysis(dset + ) + # respro.process_nutations(threshold=4) + respro.fit() + assert hasattr(respro,'results') + assert hasattr(respro,'fc') + assert hasattr(respro,'q') + + print(respro.fc) + assert respro.fc == pytest.approx(34,abs=0.1) + assert respro.q == pytest.approx(60,abs=2) + + fig = respro.plot() + assert fig is not None + + + +def test_ceil(): + assert ceil(3.5) == 4 + assert ceil(3.4) == 4 + assert ceil(3.0) == 3 + assert ceil(3.1) == 4 + assert ceil(3.9999999) == 4 + + assert ceil(45,decimals=1) == 45 + assert ceil(45,decimals=-1) == 50 + +def test_floor(): + assert floor(3.5) == 3 + assert floor(3.4) == 3 + assert floor(3.0) == 3 + assert floor(3.1) == 3 + assert floor(3.9999999) == 3 + + assert floor(45,decimals=1) == 45 + assert floor(45,decimals=-1) == 40 + +def test_calc_overlap(): + + def gauss(x,centre,sigma): + return np.exp(-((x-centre)/sigma)**2) + + gauss1 = lambda x: gauss(x,0,1) + gauss2 = lambda x: gauss(x,0.5,1) + + x = np.linspace(-5,5,1000) + overlap = calc_overlap(x,gauss1,gauss2) + + assert overlap == pytest.approx(0.352065) + +def test_BSpline_extra(): + + def gauss(x,centre,sigma): + return np.exp(-((x-centre)/sigma)**2) + + x = np.linspace(-5,5,1000) + y = gauss(x,0,1) + + spl = BSpline_extra((x,y,3)) + x_test = np.linspace(-10,10,100) + y_test = spl(x_test) + + assert y_test[0] == pytest.approx(0,abs=1e-3) + assert y_test[-1] == pytest.approx(0,abs=1e-3) + assert y_test[50] == pytest.approx(1,abs=1e-1) + +@pytest.fixture +def create_fake_respro(): + def lorenz_fcn(x, centre, sigma): + y = (0.5*sigma)/((x-centre)**2 + (0.5*sigma)**2) + return y + + mode = lambda x: lorenz_fcn(x, 34.0, 34.0/60) + x = np.linspace(33,35) + scale = 75/mode(x).max() + mode_fun = lambda x: lorenz_fcn(x, 34.0, 34.0/60) * scale + seq = ResonatorProfileSequence(B=12220,LO=34,reptime=3e3,averages=1,shots=50,fwidth=0.3) + + + [tp_x, LO_axis], data = _similate_respro(seq,mode_fun) + + dset = create_dataset_from_sequence(data,seq) + + respro = ResonatorProfileAnalysis( + nuts = dset.data.T, + freqs = dset.LO.data, + dt=2 + ) + respro.process_nutations(threshold=4) + respro.fit() + return respro + +@pytest.fixture +def create_fake_fieldsweep(): + B_axis = np.linspace(12000, 12400, 100) + Nmodel = create_Nmodel(34.0*1e3) + params = {"B":B_axis/1e1,"az":3.66,"axy":0.488,"gy":2.01,"gz":2.0,"GB":0.45,"scale":1, "Boffset":0.7} + V = Nmodel(**params) + dset = create_dataset_from_axes(V, B_axis,params={"LO":34.0},axes_labels=['B']) + fsweep = FieldSweepAnalysis(dset) + fsweep.calc_gyro() + fsweep.fit() + return fsweep + +def test_optimise_spectra_position(create_fake_fieldsweep,create_fake_respro): + fsweep = create_fake_fieldsweep + respro = create_fake_respro + + new_LO = optimise_spectra_position(respro,fsweep) + + assert new_LO == pytest.approx(-0.47,abs=0.1) #TODO: check this value \ No newline at end of file diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..0f8ea99 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,93 @@ +import numpy as np +from pyepr import Parameter +import pytest +from pyepr.utils import * + +def test_gcd(): + assert gcd([2.0, 4.0, 6.0, 8.0]) == 2 + assert gcd([1.5, 3.0]) == 1.5 + assert gcd([3, 6, 9, 12]) == 3 + assert gcd([5, 10, 15, 20]) == 5 + assert gcd([7, 14, 21, 28]) == 7 + assert gcd([8, 12, 16, 20]) == 4 + assert gcd([10, 20, 30, 40]) == 10 + assert gcd([15, 25, 35, 45]) == 5 + assert gcd([18, 24, 30, 36]) == 6 + assert gcd([21, 28, 35, 42]) == 7 + assert gcd([27, 36, 45, 54]) == 9 + +def test_transpose_dict_of_list(): + d = {'a': [1, 2, 3], 'b': [4, 5, 6], 'c': [7, 8, 9]} + expected = [{'a': 1, 'b': 4, 'c': 7}, {'a': 2, 'b': 5, 'c': 8}, {'a': 3, 'b': 6, 'c': 9}] + assert transpose_dict_of_list(d) == expected + + d = {'a': [1, 2], 'b': [3, 4], 'c': [5, 6]} + expected = [{'a': 1, 'b': 3, 'c': 5}, {'a': 2, 'b': 4, 'c': 6}] + assert transpose_dict_of_list(d) == expected + + d = {'a': [1], 'b': [2], 'c': [3]} + expected = [{'a': 1, 'b': 2, 'c': 3}] + assert transpose_dict_of_list(d) == expected + + d = {'a': [], 'b': [], 'c': []} + expected = [] + assert transpose_dict_of_list(d) == expected + +def test_transpose_list_of_dicts(): + d = [{'a': 1, 'b': 4, 'c': 7}, {'a': 2, 'b': 5, 'c': 8}, {'a': 3, 'b': 6, 'c': 9}] + expected = {'a': [1, 2, 3], 'b': [4, 5, 6], 'c': [7, 8, 9]} + assert transpose_list_of_dicts(d) == expected + + d = [{'a': 1, 'b': 3, 'c': 5}, {'a': 2, 'b': 4, 'c': 6}] + expected = {'a': [1, 2], 'b': [3, 4], 'c': [5, 6]} + assert transpose_list_of_dicts(d) == expected + + d = [{'a': 1, 'b': 2, 'c': 3}] + expected = {'a': [1], 'b': [2], 'c': [3]} + assert transpose_list_of_dicts(d) == expected + + d = [] + expected = {} + assert transpose_list_of_dicts(d) == expected + +def test_val_in_ns(): + # Test with no axis + p = Parameter(name='p', value=10, unit="ns") + assert val_in_ns(p) == 10 + p = Parameter(name='p', value=10, unit="us") + assert val_in_ns(p) == 10000 + + # Test with one axis + p = Parameter(name='p', value=10, unit="ns", step=5, dim=1) + assert np.all(val_in_ns(p) == np.array([10])) + + p = Parameter(name='p', value=10, unit="ns", step=5, dim=3) + assert np.all(val_in_ns(p) == np.array([10,15,20])) + + # Test with two axis + p = Parameter(name='p', value=10, unit="ns", step=5, dim=2) + p.add_axis(2,axis=np.array([1,2])) + + with pytest.raises(ValueError): + val_in_ns(p) + +def test_val_in_us(): + # Test with no axis + p = Parameter(name='p', value=10, unit="ns") + assert val_in_us(p) == 0.01 + p = Parameter(name='p', value=10, unit="us") + assert val_in_us(p) == 10 + + # Test with one axis + p = Parameter(name='p', value=10, unit="us", step=5, dim=1) + assert np.all(val_in_us(p) == np.array([10])) + + p = Parameter(name='p', value=10, unit="us", step=5, dim=3) + assert np.all(val_in_us(p) == np.array([10,15,20])) + + # Test with two axis + p = Parameter(name='p', value=10, unit="us", step=5, dim=2) + p.add_axis(2,axis=np.array([1,2])) + + with pytest.raises(ValueError): + val_in_us(p)