@@ -280,9 +280,14 @@ public DeltaWatch createDeltaWatch(
280280 return watch ;
281281 }
282282 } else if (hasClusterChanged && requestResourceType .equals (ResourceType .ENDPOINT )) {
283- ResponseState responseState = respondDeltaTracked (
283+ Map <String , SnapshotResource <?>> snapshotResources = snapshot .resources (request .getResourceType ());
284+ List <String > removedResources = findRemovedResources (watch ,
285+ snapshotResources );
286+ Map <String , SnapshotResource <?>> changedResources = findChangedResources (watch , snapshotResources );
287+ ResponseState responseState = respondDelta (
284288 watch ,
285- snapshot .resources (request .getResourceType ()),
289+ changedResources ,
290+ removedResources ,
286291 version ,
287292 group );
288293 if (responseState .equals (ResponseState .RESPONDED ) || responseState .equals (ResponseState .CANCELLED )) {
@@ -304,8 +309,13 @@ public DeltaWatch createDeltaWatch(
304309 }
305310
306311 // Otherwise, version is different, the watch may be responded immediately
307- ResponseState responseState = respondDeltaTracked (watch ,
308- snapshot .resources (request .getResourceType ()),
312+ Map <String , SnapshotResource <?>> snapshotResources = snapshot .resources (request .getResourceType ());
313+ List <String > removedResources = findRemovedResources (watch ,
314+ snapshotResources );
315+ Map <String , SnapshotResource <?>> changedResources = findChangedResources (watch , snapshotResources );
316+ ResponseState responseState = respondDelta (watch ,
317+ changedResources ,
318+ removedResources ,
309319 version ,
310320 group );
311321 if (responseState .equals (ResponseState .RESPONDED ) || responseState .equals (ResponseState .CANCELLED )) {
@@ -470,8 +480,10 @@ protected void respondWithSpecificOrder(T group,
470480 .filter (s -> watch .trackedResources ().get (s ) != null )
471481 .collect (Collectors .toList ());
472482
473- ResponseState responseState = respondDeltaTracked (watch ,
474- snapshotChangedResources ,
483+ Map <String , SnapshotResource <?>> changedResources = findChangedResources (watch , snapshotChangedResources );
484+
485+ ResponseState responseState = respondDelta (watch ,
486+ changedResources ,
475487 removedResources ,
476488 version ,
477489 group );
@@ -551,22 +563,30 @@ private boolean respond(Watch watch, U snapshot, T group) {
551563 return false ;
552564 }
553565
554- /**
555- * Responds a delta watch using resource version comparison.
556- *
557- * @return if the watch has been responded.
558- */
559- private ResponseState respondDeltaTracked (DeltaWatch watch ,
560- Map <String , SnapshotResource <?>> snapshotResources ,
561- String version ,
562- T group ) {
566+ private List <String > findRemovedResources (DeltaWatch watch , Map <String , SnapshotResource <?>> snapshotResources ) {
563567 // remove resources for which client has a tracked version but do not exist in snapshot
564- List < String > removedResources = watch .trackedResources ().keySet ()
568+ return watch .trackedResources ().keySet ()
565569 .stream ()
566570 .filter (s -> !snapshotResources .containsKey (s ))
567571 .collect (Collectors .toList ());
572+ }
568573
569- return respondDeltaTracked (watch , snapshotResources , removedResources , version , group );
574+ private Map <String , SnapshotResource <?>> findChangedResources (DeltaWatch watch ,
575+ Map <String , SnapshotResource <?>> snapshotResources ) {
576+ return snapshotResources .entrySet ()
577+ .stream ()
578+ .filter (entry -> {
579+ if (watch .pendingResources ().contains (entry .getKey ())) {
580+ return true ;
581+ }
582+ String resourceVersion = watch .trackedResources ().get (entry .getKey ());
583+ if (resourceVersion == null ) {
584+ // resource is not tracked, should respond it only if watch is wildcard
585+ return watch .isWildcard ();
586+ }
587+ return !entry .getValue ().version ().equals (resourceVersion );
588+ })
589+ .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
570590 }
571591
572592 private ResponseState respondDeltaTracked (DeltaWatch watch ,
0 commit comments