Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion qick_lib/qick/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.2.401
0.2.403
210 changes: 210 additions & 0 deletions qick_lib/qick/qick_asm.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from numbers import Number
from abc import ABC, abstractmethod
from tqdm.auto import tqdm
from typing import Generator, Dict, Any, List

from qick import obtain, get_version
from .helpers import to_int, cosine, gauss, triang, DRAG, decode_array, nqz, nyquist_image
Expand Down Expand Up @@ -2352,3 +2353,212 @@ def print_sg_mem(self, sg_idx=0, gen_file=False):
else:
print(s)



def stream_acquire(
self,
soc,
rounds: int = 1,
load_envelopes: bool = True,
start_src: str = "internal",
progress: bool = True,
remove_offset: bool = True,
len_normalize: bool = True,
include_full: bool = False,
return_end_of_exp_raw: bool = False,
) -> Generator[Dict[str, Any], None, None]:
"""Stream raw accumulated readout data.

Provides a generator interface for streaming raw accumulated readout data incrementally.
Yields dict events describing acquisition progress until completion. The final return
(StopIteration value) is None; the final aggregated raw round buffers are provided in
the last yield with event='complete'.

Parameters
----------
soc : QickSoc
Qick object that executes the program
rounds : int, optional
Number of times to rerun the program, averaging results in software (aka "soft averages").
Default is 1.
load_envelopes : bool, optional
If True, load pulse envelopes before executing. Default is True.
start_src : str, optional
Start source for the tProc. Can be "internal" (tProc starts immediately) or "external"
(each round waits for an external trigger). Default is "internal".
progress : bool, optional
If True, display progress bars for rounds and shots per round. Default is True.
remove_offset : bool, optional
If True, subtract the readout's IQ offset from the partial channel data. Default is True.
len_normalize : bool, optional
If True, normalize partial channel data by dividing by the readout window length.
Default is True.
include_full : bool, optional
If True, include the current state of full buffers in each 'data' event. Default is False.
return_end_of_exp_raw : bool, optional
If True, yield a final event with all rounds' raw buffer data. Default is False.

Yields
------
dict
Event dictionaries describing acquisition progress. Possible event forms:

- 'data' event: {'event': 'data', 'round': r, 'rep_slice': (start, stop),
'partial': {ch: ndarray(new_points, nreads, 2)},
'buffers': {ch: ndarray(*loop_dims, nreads, 2)} (optional)}

- 'round-complete' event: {'event': 'round-complete', 'round': r,
'round_raw': [buffer_per_ch]}

- 'complete' event: {'event': 'complete', 'rounds_raw': list_of_round_buffers}

Returns
-------
None
StopIteration value is None when all rounds complete.
"""
# Validate required acquisition parameters were set via setup_acquire.
if any([x is None for x in [self.counter_addr, self.loop_dims, self.avg_level]]):
raise RuntimeError(
"data dimensions need to be defined with setup_acquire() before calling stream_acquire()"
)

# Configure acquisition params similar to AcquireMixin.acquire but mark a distinct type.
self.acquire_params = {
'type': 'accumulated_stream',
'soc': soc,
'start_src': start_src,
'rounds_remaining': rounds,
'remove_offset': remove_offset,
'hidereps': True, # we manage optional progress display separately
}

total_count = int(np.prod(self.loop_dims)) # total flattened shots
reads_per_shot = [ro['trigs'] for ro in self.ro_chs.values()]

# Preallocate NaN-filled full buffers per channel (same shape as acc_buf in AcquireMixin)
#TODO: consider keeping buffers for all rounds instead of just one?
self.acc_buf = [
np.full((*self.loop_dims, nreads, 2), np.nan, dtype=float)
for nreads in reads_per_shot
]

# For completeness across rounds
all_rounds_raw: List[List[np.ndarray]] = []

# Load program & configure (do not load data memory yet; mirror acquire())
self.config_all(soc, load_envelopes=load_envelopes, load_mem=False)

# Progress bar for rounds only (optional)
from tqdm.auto import tqdm # local import; already used elsewhere
self.round_track = tqdm(total=rounds, disable=not progress, desc="Rounds")

for round_index in range(rounds):
self.rounds_pbar = tqdm(total=total_count, disable=not progress, desc="Shots per Round", leave=False if rounds > 1 else True)
# Prepare firmware for this round (subset of prepare_round logic for accumulated type)
# Raw buffers already allocated; zero tProc counter & configure capture.
self.config_bufs(soc, enable_avg=True, enable_buf=False)
soc.reload_mem()
soc.clear_tproc_counter(addr=self.counter_addr)
soc.start_src(start_src)

# Tracking variables
count = 0 # number of shots (flattened) fully received

# Start combined readout capture (similar to AcquireMixin.finish_round accumulated section)
soc.start_readout(
total_count,
counter_addr=self.counter_addr,
ch_list=list(self.ro_chs),
reads_per_shot=reads_per_shot,
)

# Poll until all shots acquired
while count < total_count:
new_data = obtain(soc.poll_data()) # expect iterable of (new_points, (d, stats))
new_data = list(new_data) # materialize if generator
# logger.info(f"new_data raw polled : {new_data} \n ================ \n ==============\n")
if not new_data:
continue # nothing arrived yet
for new_points, (d_list, stats) in new_data:
if new_points <= 0:
continue
# Package incremental partial arrays per channel
partial: Dict[int, np.ndarray] = {}
for ii, nreads in enumerate(reads_per_shot):
flat = d_list[ii]
# logger.info(f"Channel {ii} polled flat data: {flat}")
# Expect flat shape (new_points * nreads, 2)
if flat.shape[0] != new_points * nreads:
logger.error(
"data size mismatch: new_points=%d, nreads=%d, data shape %s",
new_points,
nreads,
flat.shape,
)
if count + new_points > total_count:
logger.error(
"got too much data: count=%d, new_points=%d, total_count=%d",
count,
new_points,
total_count,
)
start_flat = count * nreads
stop_flat = (count + new_points) * nreads
# Assign into preallocated buffer (reshape view for contiguous assignment)
self.acc_buf[ii].reshape((-1, 2))[start_flat:stop_flat] = flat
# Reshape partial to (new_points, nreads, 2)
partial_channel = flat.reshape(new_points, nreads, 2).astype(float)
# logger.info(f"Channel {ii} partial reshaped data to shape {partial_channel.shape}: {partial_channel}")
if len_normalize:
# length normalization: divide by readout length
ch_num = list(self.ro_chs.keys())[ii]
length = self.ro_chs[ch_num]['length']
# logger.info(f"Length normalization: readout length for ch {ch_num} is {length}")
partial_channel /= length
if remove_offset and not np.isnan(partial_channel).all():
# subtract offset scaled to raw accumulated counts
rocfg = self.ro_chs[ch_num].get('ro_config')
# _ro_offset returns the DC offset
offset = self._ro_offset(ch_num, rocfg)
partial_channel -= offset
partial[list(self.ro_chs.keys())[ii]] = partial_channel
rep_slice = (count, count + new_points)
count += new_points

yield_event = {
'event': 'data',
'round': round_index,
'rep_slice': rep_slice,
'partial': partial,
}
if include_full:
# Provide current state of full buffers (copy to avoid external mutation)
yield_event['buffers'] = {
ch: buf.copy() for ch, buf in zip(self.ro_chs.keys(), self.acc_buf)
}
self.rounds_pbar.update(new_points)
yield yield_event

# Round complete; snapshot raw buffers for this round (copy to freeze state)
round_raw = [buf.copy() for buf in self.acc_buf]
all_rounds_raw.append(round_raw)
yield {
'event': 'round-complete',
'round': round_index,
'round_raw': round_raw,
}
# Prepare for next round (reset buffers to NaN if more rounds remaining)
if round_index < rounds - 1:
for ii, nreads in enumerate(reads_per_shot):
self.acc_buf[ii].fill(np.nan)

self.rounds_pbar.close()
self.round_track.update(1)
self.round_track.close()
# Final event summarizing all rounds' raw data
if return_end_of_exp_raw:
yield {
'event': 'complete',
'rounds_raw': all_rounds_raw,
}