Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Card status fixed, and new method to get raw waveforms from digitisers #59

Merged
merged 2 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions src/spectrumdevice/devices/digitiser/digitiser_card.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import logging
from typing import List, Optional, Sequence, cast

from numpy import float_, mod, squeeze, zeros
from numpy import float_, int16, mod, squeeze, zeros
from numpy.typing import NDArray

from spectrum_gmbh.py_header.regs import (
Expand Down Expand Up @@ -105,8 +105,8 @@ def wait_for_acquisition_to_complete(self) -> None:
"""
self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_WAITREADY)

def get_waveforms(self) -> List[List[NDArray[float_]]]:
"""Get a list of the most recently transferred waveforms, in channel order.
def get_raw_waveforms(self) -> List[List[NDArray[int16]]]:
"""Get a list of the most recently transferred waveforms, in channel order, as 16-bit integers.

This method copies and reshapes the samples in the `TransferBuffer` into a list of lists of 1D NumPy arrays
(waveforms) and returns the list.
Expand All @@ -120,7 +120,7 @@ def get_waveforms(self) -> List[List[NDArray[float_]]]:
this would the rate at which your trigger source was running).

Returns:
waveforms (List[List[NDArray[float_]]]): A list of lists of 1D numpy arrays, one inner list per acquisition
waveforms (List[List[NDArray[int16]]]): A list of lists of 1D numpy arrays, one inner list per acquisition
and one array per enabled channel, in channel order. To average the acquisitions:
`np.array(waveforms).mean(axis=0)`

Expand Down Expand Up @@ -167,17 +167,34 @@ def get_waveforms(self) -> List[List[NDArray[float_]]]:
(self._batch_size, self.acquisition_length_in_samples, len(self.enabled_analog_channel_nums))
)

repeat_acquisitions = []
for n in range(self._batch_size):
repeat_acquisitions.append([waveform for waveform in waveforms_in_columns[n, :, :].T])

return repeat_acquisitions

def get_waveforms(self) -> List[List[NDArray[float_]]]:
"""Get a list of the most recently transferred waveforms, in channel order, in Volts as floats.

See get_raw_waveforms() for details.

Returns:
waveforms (List[List[NDArray[float_]]]): A list of lists of 1D numpy arrays, one inner list per acquisition
and one array per enabled channel, in channel order. To average the acquisitions:
`np.array(waveforms).mean(axis=0)`

"""
raw_repeat_acquisitions = self.get_raw_waveforms()
repeat_acquisitions = []
for n in range(self._batch_size):
repeat_acquisitions.append(
[
cast(
SpectrumDigitiserAnalogChannel, self.analog_channels[ch_num]
).convert_raw_waveform_to_voltage_waveform(squeeze(waveform))
for ch_num, waveform in zip(self.enabled_analog_channel_nums, waveforms_in_columns[n, :, :].T)
for ch_num, waveform in zip(self.enabled_analog_channel_nums, raw_repeat_acquisitions[n])
]
)

return repeat_acquisitions

def get_timestamp(self) -> Optional[datetime.datetime]:
Expand Down
6 changes: 5 additions & 1 deletion src/spectrumdevice/devices/digitiser/digitiser_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from datetime import datetime
from typing import List, Optional

from numpy import float_, ndarray
from numpy import float_, int16, ndarray
from numpy.typing import NDArray

from spectrumdevice.devices.abstract_device.device_interface import SpectrumDeviceInterface
Expand Down Expand Up @@ -104,6 +104,10 @@ def execute_finite_fifo_acquisition(self, num_measurements: int) -> List[Measure
def execute_continuous_fifo_acquisition(self) -> None:
raise NotImplementedError()

@abstractmethod
def get_raw_waveforms(self) -> List[List[NDArray[int16]]]:
raise NotImplementedError()

@abstractmethod
def get_waveforms(self) -> List[List[NDArray[float_]]]:
raise NotImplementedError()
Expand Down
37 changes: 28 additions & 9 deletions src/spectrumdevice/devices/digitiser/digitiser_star_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
# Licensed under the MIT. You may obtain a copy at https://opensource.org/licenses/MIT.
import datetime
from threading import Thread
from typing import Dict, List, Optional, Sequence
from typing import Callable, Dict, List, Optional, Sequence, TypeVar

from numpy import float_
from numpy import float_, int16
from numpy.typing import NDArray

from spectrumdevice.devices.abstract_device import (
Expand All @@ -23,6 +23,10 @@
from spectrumdevice.settings.device_modes import AcquisitionMode


WAVEFORM_TYPE_VAR = TypeVar("WAVEFORM_TYPE_VAR", NDArray[float_], NDArray[int16])


# noinspection PyTypeChecker
class SpectrumDigitiserStarHub(
AbstractSpectrumStarHub[
SpectrumDigitiserCard, SpectrumDigitiserAnalogChannelInterface, SpectrumDigitiserIOLineInterface
Expand Down Expand Up @@ -70,23 +74,38 @@ def wait_for_acquisition_to_complete(self) -> None:
card.wait_for_acquisition_to_complete()

def get_waveforms(self) -> List[List[NDArray[float_]]]:
"""Get a list of the most recently transferred waveforms.
"""Get a list of the most recently transferred waveforms, as floating point voltages.

This method gets the waveforms from each child card and joins them into a new list, ordered by channel number.
See `SpectrumDigitiserCard.get_waveforms()` for more information.

Args:
num_acquisitions (int): For FIFO mode: the number of acquisitions (i.e. trigger events) to wait for and
copy. Acquiring in batches (num_acquisitions > 1) can improve performance.

Returns:
waveforms (List[List[NDArray[float_]]]): A list lists of 1D numpy arrays, one inner list per acquisition,
and one array per enabled channel, in channel order.
"""
card_ids_and_waveform_sets: Dict[str, list[list[NDArray[float_]]]] = {}
return self._get_waveforms_in_threads(SpectrumDigitiserCard.get_waveforms)

def get_raw_waveforms(self) -> List[List[NDArray[int16]]]:
"""Get a list of the most recently transferred waveforms, as integers.

This method gets the waveforms from each child card and joins them into a new list, ordered by channel number.
See `SpectrumDigitiserCard.get_waveforms()` for more information.

Returns:
waveforms (List[List[NDArray[int16]]]): A list lists of 1D numpy arrays, one inner list per acquisition,
and one array per enabled channel, in channel order.
"""
return self._get_waveforms_in_threads(SpectrumDigitiserCard.get_raw_waveforms)

def _get_waveforms_in_threads(
self, get_waveforms_method: Callable[[SpectrumDigitiserCard], List[List[WAVEFORM_TYPE_VAR]]]
) -> List[List[WAVEFORM_TYPE_VAR]]:
"""Gets waveforms from child cards in separate threads, using the SpectrumDigitiserCard method provided."""

card_ids_and_waveform_sets: Dict[str, list[list[WAVEFORM_TYPE_VAR]]] = {}

def _get_waveforms(digitiser_card: SpectrumDigitiserCard) -> None:
this_cards_waveforms = digitiser_card.get_waveforms()
this_cards_waveforms = get_waveforms_method(digitiser_card)
card_ids_and_waveform_sets[str(digitiser_card)] = this_cards_waveforms

threads = [Thread(target=_get_waveforms, args=(card,)) for card in self._child_cards]
Expand Down
4 changes: 1 addition & 3 deletions src/spectrumdevice/settings/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,4 @@ class StatusCode(Enum):
def decode_status(code: int) -> CARD_STATUS_TYPE:
"""Converts the integer value received by a card when queried about its status to a list of StatusCodes."""
possible_codes = [code.value for code in StatusCode]
return CARD_STATUS_TYPE(
[StatusCode(found_code) for found_code in decode_bitmap_using_list_of_ints(code, possible_codes)]
)
return [StatusCode(found_code) for found_code in decode_bitmap_using_list_of_ints(code, possible_codes)]
Loading