Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 35 additions & 28 deletions src/console/spcm_control/tx_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,19 @@ def _fifo_stream_worker(self, data: np.ndarray) -> None:
min_ring_buffer_size = int(np.ceil(self.data_buffer_size / notify_size.value) * notify_size.value)
# Create page-aligned ring buffer
ring_buffer = create_dma_buffer(min(self.max_ring_buffer_size.value, min_ring_buffer_size))
# Get the size of the ring buffer
ring_buffer_size = spcm.uint64(len(ring_buffer))

try:
# Get the memory location of the data buffer and ring buffer
data_buffer_addr = ctypes.cast(data_buffer, ctypes.c_void_p).value
ring_buffer_addr = ctypes.cast(ring_buffer, ctypes.c_void_p).value
if not data_buffer_addr or not ring_buffer_addr:
raise RuntimeError("Could not get ring or data buffer position.")
except RuntimeError as err:
self.log.exception(err, exc_info=True)
raise err

try:
# Check if ring buffer size is multiple of 2*num_ch (2 bytes per sample per channel)
if ring_buffer_size.value % (self.num_ch * 2) != 0:
Expand All @@ -369,23 +380,17 @@ def _fifo_stream_worker(self, data: np.ndarray) -> None:
notify_size.value,
)

try:
# Perform initial memory transfer: Fill the whole ring buffer
if (_ring_buffer_pos := ctypes.cast(ring_buffer, ctypes.c_void_p).value) and \
(_data_buffer_pos := ctypes.cast(data_buffer, ctypes.c_void_p).value):
if self.data_buffer_size < ring_buffer_size.value:
ctypes.memmove(_ring_buffer_pos, _data_buffer_pos, self.data_buffer_size)
transferred_bytes = self.data_buffer_size
else:
ctypes.memmove(_ring_buffer_pos, _data_buffer_pos, ring_buffer_size.value)
transferred_bytes = ring_buffer_size.value
else:
raise RuntimeError("Could not get ring or data buffer position.")
except RuntimeError as err:
self.log.exception(err, exc_info=True)
raise err
# Perform initial memory transfer
# If the data buffer fits in the ring buffer, copy the entire data buffer
if self.data_buffer_size < ring_buffer_size.value:
ctypes.memmove(ring_buffer_addr, data_buffer_addr, self.data_buffer_size)
transferred_bytes = self.data_buffer_size
else:
# Otherwise fill the ring buffer completely with data
ctypes.memmove(ring_buffer_addr, data_buffer_addr, ring_buffer_size.value)
transferred_bytes = ring_buffer_size.value

# Perform initial data transfer to completely fill continuous buffer
# Define the transfer buffer
spcm.spcm_dwDefTransfer_i64(
self.card,
spcm.SPCM_BUF_DATA,
Expand All @@ -396,9 +401,11 @@ def _fifo_stream_worker(self, data: np.ndarray) -> None:
ring_buffer_size,
)

# Tell the card how much data is available
self.handle_error(spcm.spcm_dwSetParam_i64(self.card, spcm.SPC_DATA_AVAIL_CARD_LEN, ring_buffer_size))

self.log.debug("Starting card memory transfer")
# Start data transfer from ring_buffer and wait until (buffer) data transfer completion
self.handle_error(spcm.spcm_dwSetParam_i32(
self.card,
spcm.SPC_M2CMD,
Expand All @@ -418,24 +425,22 @@ def _fifo_stream_worker(self, data: np.ndarray) -> None:
transfer_count = 0

while (transferred_bytes < self.data_buffer_size) and not self.is_running.is_set():
# Read available bytes and user position
# Read number of available bytes to write to on card
spcm.spcm_dwGetParam_i32(self.card, spcm.SPC_DATA_AVAIL_USER_LEN, ctypes.byref(avail_bytes))
# Read where those available bytes are located in memory
spcm.spcm_dwGetParam_i32(self.card, spcm.SPC_DATA_AVAIL_USER_POS, ctypes.byref(usr_position))

# Calculate new data for the transfer, when notify_size is available on continous buffer
# Only transfer when notify_size number of bytes are available
if avail_bytes.value >= notify_size.value:
transfer_count += 1

ring_buffer_position = ctypes.cast((
ctypes.c_char * (self.max_ring_buffer_size.value - usr_position.value)).from_buffer(
ring_buffer, usr_position.value), ctypes.c_void_p
).value

current_data_buffer = ctypes.cast(data_buffer, ctypes.c_void_p).value
# Calculate position within ring buffer
ring_buffer_position = ring_buffer_addr + usr_position.value

# Get new buffer positions
if ring_buffer_position and current_data_buffer:
data_buffer_position = current_data_buffer + transferred_bytes
if ring_buffer_position and data_buffer_addr:
# Calculate current position/progress in original data array
data_buffer_position = data_buffer_addr + transferred_bytes

if (bytes_remaining := self.data_buffer_size - transferred_bytes) >= notify_size.value:
# Enough data available -> copy notify size
Expand All @@ -445,7 +450,7 @@ def _fifo_stream_worker(self, data: np.ndarray) -> None:
notify_size.value,
)
else:
# Not enough data availabe -> set remaining bytes to zero
# Not enough data availabe -> Copy available data, set remaining bytes to zero
ctypes.memmove(
ring_buffer_position,
data_buffer_position,
Expand All @@ -457,12 +462,14 @@ def _fifo_stream_worker(self, data: np.ndarray) -> None:
notify_size.value - bytes_remaining,
)

# Tell card notify_size amount of data is available
self.handle_error(
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_DATA_AVAIL_CARD_LEN, notify_size)
)
transferred_bytes += notify_size.value

# Wait for data transfer to complete
self.handle_error(spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_M2CMD, spcm.M2CMD_DATA_WAITDMA))

# Wait for data transfer to complete
self.handle_error(spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_M2CMD, spcm.M2CMD_DATA_WAITDMA))
self.log.debug("Card operation stopped")
Loading