Skip to content

Commit c42169f

Browse files
committed
Compute progress in core extension
1 parent 23481d8 commit c42169f

File tree

4 files changed

+126
-8
lines changed

4 files changed

+126
-8
lines changed

crates/core/src/sync/sync_status.rs

Lines changed: 121 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use alloc::{
22
boxed::Box,
33
collections::{btree_map::BTreeMap, btree_set::BTreeSet},
4+
format,
45
rc::Rc,
56
string::String,
67
vec::Vec,
@@ -9,9 +10,13 @@ use core::{
910
cell::RefCell,
1011
cmp::min,
1112
hash::{BuildHasher, Hash},
13+
ops::AddAssign,
1214
};
1315
use rustc_hash::FxBuildHasher;
14-
use serde::Serialize;
16+
use serde::{
17+
Serialize,
18+
ser::{SerializeMap, SerializeStruct},
19+
};
1520
use sqlite_nostd::ResultCode;
1621

1722
use crate::{
@@ -28,7 +33,7 @@ use super::{
2833
};
2934

3035
/// Information about a progressing download.
31-
#[derive(Serialize, Hash)]
36+
#[derive(Hash)]
3237
pub struct DownloadSyncStatus {
3338
/// Whether the socket to the sync service is currently open and connected.
3439
///
@@ -136,6 +141,72 @@ impl Default for DownloadSyncStatus {
136141
}
137142
}
138143

144+
impl Serialize for DownloadSyncStatus {
145+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
146+
where
147+
S: serde::Serializer,
148+
{
149+
struct SerializeStreamsWithProgress<'a>(&'a DownloadSyncStatus);
150+
151+
impl<'a> Serialize for SerializeStreamsWithProgress<'a> {
152+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
153+
where
154+
S: serde::Serializer,
155+
{
156+
#[derive(Serialize)]
157+
struct StreamWithProgress<'a> {
158+
#[serde(flatten)]
159+
subscription: &'a ActiveStreamSubscription,
160+
progress: ProgressCounters,
161+
}
162+
163+
let streams = self.0.streams.iter().map(|sub| {
164+
let mut stream_progress = ProgressCounters::default();
165+
if let Some(sync_progress) = &self.0.downloading {
166+
for bucket in &sub.associated_buckets {
167+
if let Some(bucket_progress) = sync_progress.buckets.get(bucket) {
168+
stream_progress += bucket_progress;
169+
}
170+
}
171+
}
172+
173+
StreamWithProgress {
174+
subscription: sub,
175+
progress: stream_progress,
176+
}
177+
});
178+
179+
serializer.collect_seq(streams)
180+
}
181+
}
182+
183+
let mut serializer = serializer.serialize_struct("DownloadSyncStatus", 4)?;
184+
serializer.serialize_field("connected", &self.connected)?;
185+
serializer.serialize_field("connecting", &self.connecting)?;
186+
serializer.serialize_field("priority_status", &self.priority_status)?;
187+
serializer.serialize_field("downloading", &self.downloading)?;
188+
serializer.serialize_field("streams", &SerializeStreamsWithProgress(self))?;
189+
190+
serializer.end()
191+
}
192+
}
193+
194+
#[derive(Serialize, Default)]
195+
struct ProgressCounters {
196+
total: i64,
197+
downloaded: i64,
198+
}
199+
200+
impl<'a> AddAssign<&'a BucketProgress> for ProgressCounters {
201+
fn add_assign(&mut self, rhs: &'a BucketProgress) {
202+
let downloaded = rhs.since_last;
203+
let total = rhs.target_count - rhs.at_last;
204+
205+
self.total += total;
206+
self.downloaded += downloaded;
207+
}
208+
}
209+
139210
pub struct SyncStatusContainer {
140211
status: Rc<RefCell<DownloadSyncStatus>>,
141212
last_published_hash: u64,
@@ -204,11 +275,57 @@ pub struct BucketProgress {
204275
pub target_count: i64,
205276
}
206277

207-
#[derive(Serialize, Hash)]
278+
#[derive(Hash)]
208279
pub struct SyncDownloadProgress {
209280
buckets: BTreeMap<String, BucketProgress>,
210281
}
211282

283+
impl Serialize for SyncDownloadProgress {
284+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
285+
where
286+
S: serde::Serializer,
287+
{
288+
// When we publish a SyncDownloadProgress to clients, avoid serializing every bucket since
289+
// that can lead to very large status updates.
290+
// Instead, we report one entry per priority group.
291+
let mut by_priority = BTreeMap::<BucketPriority, ProgressCounters>::new();
292+
for progress in self.buckets.values() {
293+
let priority_progress = by_priority.entry(progress.priority).or_default();
294+
*priority_progress += progress;
295+
}
296+
297+
// We used to serialize SyncDownloadProgress as-is. To keep backwards-compatibility with the
298+
// general format, we're now synthesizing a fake bucket id for each priority and then report
299+
// each priority as a single-bucket item. This allows keeping client logic unchanged.
300+
struct SerializeWithFakeBucketNames(BTreeMap<BucketPriority, ProgressCounters>);
301+
302+
impl Serialize for SerializeWithFakeBucketNames {
303+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
304+
where
305+
S: serde::Serializer,
306+
{
307+
let mut serializer = serializer.serialize_map(Some(self.0.len()))?;
308+
for (priority, progress) in &self.0 {
309+
serializer.serialize_entry(
310+
&format!("prio_{}", priority.number),
311+
&BucketProgress {
312+
priority: *priority,
313+
at_last: 0,
314+
since_last: progress.downloaded,
315+
target_count: progress.total,
316+
},
317+
)?;
318+
}
319+
serializer.end()
320+
}
321+
}
322+
323+
let mut serializer = serializer.serialize_struct("SyncDownloadProgress", 1)?;
324+
serializer.serialize_field("buckets", &SerializeWithFakeBucketNames(by_priority))?;
325+
serializer.end()
326+
}
327+
}
328+
212329
pub struct SyncProgressFromCheckpoint {
213330
pub progress: SyncDownloadProgress,
214331
pub needs_counter_reset: bool,
@@ -282,6 +399,7 @@ pub struct ActiveStreamSubscription {
282399
pub id: i64,
283400
pub name: String,
284401
pub parameters: Option<Box<JsonString>>,
402+
#[serde(skip)]
285403
pub associated_buckets: BTreeSet<String>,
286404
pub priority: Option<BucketPriority>,
287405
pub active: bool,

dart/test/goldens/simple_iteration.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
"priority_status": [],
5858
"downloading": {
5959
"buckets": {
60-
"a": {
60+
"prio_3": {
6161
"priority": 3,
6262
"at_last": 0,
6363
"since_last": 0,
@@ -107,7 +107,7 @@
107107
"priority_status": [],
108108
"downloading": {
109109
"buckets": {
110-
"a": {
110+
"prio_3": {
111111
"priority": 3,
112112
"at_last": 0,
113113
"since_last": 1,

dart/test/sync_stream_test.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ void main() {
9090
{
9191
'name': 'my_default_stream',
9292
'parameters': null,
93-
'associated_buckets': ['a'],
93+
'progress': {'total': 1, 'downloaded': 0},
9494
'active': true,
9595
'is_default': true,
9696
'has_explicit_subscription': false,
@@ -638,7 +638,7 @@ void main() {
638638
'name': 'a',
639639
'parameters': null,
640640
// not persisted, only needed for download progress
641-
'associated_buckets': [],
641+
'progress': {'total': 0, 'downloaded': 0},
642642
'priority': null, // same
643643
'active': true,
644644
'is_default': false,

dart/test/sync_test.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -808,7 +808,7 @@ void _syncTests<T>({
808808
'downloading',
809809
{
810810
'buckets': {
811-
'a': {
811+
'prio_3': {
812812
'priority': 3,
813813
'at_last': 0,
814814
'since_last': 1,

0 commit comments

Comments
 (0)