Skip to content
This repository was archived by the owner on Nov 25, 2024. It is now read-only.

Commit 00217a6

Browse files
authored
Only fetch events once for all rooms (#3311)
This refactors `PDUStreamProvider` a bit so that it doesn't trigger a database query per room, but instead utilizes the fact that it's possible to bulk query. This improves sync performance significantly when you have 1000s of rooms. ### Pull Request Checklist <!-- Please read https://matrix-org.github.io/dendrite/development/contributing before submitting your pull request --> * [x] I have added Go unit tests or [Complement integration tests](https://github.com/matrix-org/complement) for this PR _or_ I have justified why this PR doesn't need tests * [x] Pull request includes a [sign off below using a legally identifiable name](https://matrix-org.github.io/dendrite/development/contributing#sign-off) _or_ I have already signed off privately Signed-off-by: `Joakim Recht <[email protected]>`
1 parent d58daf9 commit 00217a6

File tree

1 file changed

+70
-24
lines changed

1 file changed

+70
-24
lines changed

syncapi/streams/stream_pdu.go

Lines changed: 70 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,12 @@ func (p *PDUStreamProvider) IncrementalSync(
203203
req.Log.WithError(err).Error("unable to update event filter with ignored users")
204204
}
205205

206+
dbEvents, err := p.getRecentEvents(ctx, stateDeltas, r, eventFilter, snapshot)
207+
if err != nil {
208+
req.Log.WithError(err).Error("unable to get recent events")
209+
return r.From
210+
}
211+
206212
newPos = from
207213
for _, delta := range stateDeltas {
208214
newRange := r
@@ -218,7 +224,7 @@ func (p *PDUStreamProvider) IncrementalSync(
218224
}
219225
}
220226
var pos types.StreamPosition
221-
if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req); err != nil {
227+
if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req, dbEvents); err != nil {
222228
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
223229
if err == context.DeadlineExceeded || err == context.Canceled || err == sql.ErrTxDone {
224230
return newPos
@@ -240,6 +246,66 @@ func (p *PDUStreamProvider) IncrementalSync(
240246
return newPos
241247
}
242248

249+
func (p *PDUStreamProvider) getRecentEvents(ctx context.Context, stateDeltas []types.StateDelta, r types.Range, eventFilter synctypes.RoomEventFilter, snapshot storage.DatabaseTransaction) (map[string]types.RecentEvents, error) {
250+
var roomIDs []string
251+
var newlyJoinedRoomIDs []string
252+
for _, delta := range stateDeltas {
253+
if delta.NewlyJoined {
254+
newlyJoinedRoomIDs = append(newlyJoinedRoomIDs, delta.RoomID)
255+
} else {
256+
roomIDs = append(roomIDs, delta.RoomID)
257+
}
258+
}
259+
dbEvents := make(map[string]types.RecentEvents)
260+
if len(roomIDs) > 0 {
261+
events, err := snapshot.RecentEvents(
262+
ctx, roomIDs, r,
263+
&eventFilter, true, true,
264+
)
265+
if err != nil {
266+
if err != sql.ErrNoRows {
267+
return nil, err
268+
}
269+
}
270+
for k, v := range events {
271+
dbEvents[k] = v
272+
}
273+
}
274+
if len(newlyJoinedRoomIDs) > 0 {
275+
// For rooms that were joined in this sync, try to fetch
276+
// as much timeline events as allowed by the filter.
277+
278+
filter := eventFilter
279+
// If we're going backwards, grep at least X events, this is mostly to satisfy Sytest
280+
if eventFilter.Limit < recentEventBackwardsLimit {
281+
filter.Limit = recentEventBackwardsLimit // TODO: Figure out a better way
282+
diff := r.From - r.To
283+
if diff > 0 && diff < recentEventBackwardsLimit {
284+
filter.Limit = int(diff)
285+
}
286+
}
287+
288+
events, err := snapshot.RecentEvents(
289+
ctx, newlyJoinedRoomIDs, types.Range{
290+
From: r.To,
291+
To: 0,
292+
Backwards: true,
293+
},
294+
&filter, true, true,
295+
)
296+
if err != nil {
297+
if err != sql.ErrNoRows {
298+
return nil, err
299+
}
300+
}
301+
for k, v := range events {
302+
dbEvents[k] = v
303+
}
304+
}
305+
306+
return dbEvents, nil
307+
}
308+
243309
// Limit the recent events to X when going backwards
244310
const recentEventBackwardsLimit = 100
245311

@@ -253,29 +319,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
253319
eventFilter *synctypes.RoomEventFilter,
254320
stateFilter *synctypes.StateFilter,
255321
req *types.SyncRequest,
322+
dbEvents map[string]types.RecentEvents,
256323
) (types.StreamPosition, error) {
257324
var err error
258-
originalLimit := eventFilter.Limit
259-
// If we're going backwards, grep at least X events, this is mostly to satisfy Sytest
260-
if r.Backwards && originalLimit < recentEventBackwardsLimit {
261-
eventFilter.Limit = recentEventBackwardsLimit // TODO: Figure out a better way
262-
diff := r.From - r.To
263-
if diff > 0 && diff < recentEventBackwardsLimit {
264-
eventFilter.Limit = int(diff)
265-
}
266-
}
267-
268-
dbEvents, err := snapshot.RecentEvents(
269-
ctx, []string{delta.RoomID}, r,
270-
eventFilter, true, true,
271-
)
272-
if err != nil {
273-
if err == sql.ErrNoRows {
274-
return r.To, nil
275-
}
276-
return r.From, fmt.Errorf("p.DB.RecentEvents: %w", err)
277-
}
278-
279325
recentStreamEvents := dbEvents[delta.RoomID].Events
280326
limited := dbEvents[delta.RoomID].Limited
281327

@@ -337,9 +383,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
337383
logrus.WithError(err).Error("unable to apply history visibility filter")
338384
}
339385

340-
if r.Backwards && len(events) > originalLimit {
386+
if r.Backwards && len(events) > eventFilter.Limit {
341387
// We're going backwards and the events are ordered chronologically, so take the last `limit` events
342-
events = events[len(events)-originalLimit:]
388+
events = events[len(events)-eventFilter.Limit:]
343389
limited = true
344390
}
345391

0 commit comments

Comments
 (0)