11import { type Client , connect } from "./client" ;
22import { Patterns } from "./patterns" ;
3- import { updateInterest , type InterestUpdate } from "@cocalc/conat/core/server" ;
3+ import {
4+ updateInterest ,
5+ updateSticky ,
6+ type InterestUpdate ,
7+ type StickyUpdate ,
8+ } from "@cocalc/conat/core/server" ;
49import type { DStream } from "@cocalc/conat/sync/dstream" ;
510import { once } from "@cocalc/util/async-utils" ;
611import { server as createPersistServer } from "@cocalc/conat/persist/server" ;
@@ -42,12 +47,14 @@ export async function clusterLink(
4247 return link ;
4348}
4449
50+ export type Sticky = { [ pattern : string ] : { [ subject : string ] : string } } ;
4551export type Interest = Patterns < { [ queue : string ] : Set < string > } > ;
4652
4753export { type ClusterLink } ;
4854
4955class ClusterLink {
5056 public interest : Interest = new Patterns ( ) ;
57+ public sticky : Sticky = { } ;
5158 private streams : ClusterStreams ;
5259 private state : "init" | "ready" | "closed" = "init" ;
5360 private clientStateChanged = Date . now ( ) ; // when client status last changed
@@ -78,7 +85,10 @@ class ClusterLink {
7885 clusterName : this . clusterName ,
7986 } ) ;
8087 for ( const update of this . streams . interest . getAll ( ) ) {
81- updateInterest ( update , this . interest ) ;
88+ updateInterest ( update , this . interest , this . sticky ) ;
89+ }
90+ for ( const update of this . streams . sticky . getAll ( ) ) {
91+ updateSticky ( update , this . sticky ) ;
8292 }
8393 // I have a slight concern about this because updates might not
8494 // arrive in order during automatic failover. That said, maybe
@@ -87,6 +97,7 @@ class ClusterLink {
8797 // it is about, and when that server goes down none of this state
8898 // matters anymore.
8999 this . streams . interest . on ( "change" , this . handleInterestUpdate ) ;
100+ this . streams . sticky . on ( "change" , this . handleStickyUpdate ) ;
90101 this . state = "ready" ;
91102 } ;
92103
@@ -95,7 +106,11 @@ class ClusterLink {
95106 } ;
96107
97108 handleInterestUpdate = ( update : InterestUpdate ) => {
98- updateInterest ( update , this . interest ) ;
109+ updateInterest ( update , this . interest , this . sticky ) ;
110+ } ;
111+
112+ handleStickyUpdate = ( update : StickyUpdate ) => {
113+ updateSticky ( update , this . sticky ) ;
99114 } ;
100115
101116 private handleClientStateChanged = ( ) => {
@@ -119,6 +134,7 @@ class ClusterLink {
119134 if ( this . streams != null ) {
120135 this . streams . interest . removeListener ( "change" , this . handleInterestUpdate ) ;
121136 this . streams . interest . close ( ) ;
137+ this . streams . sticky . close ( ) ;
122138 // @ts -ignore
123139 delete this . streams ;
124140 }
@@ -162,9 +178,10 @@ class ClusterLink {
162178 return false ;
163179 } ;
164180
165- hash = ( ) : { interest : number } => {
181+ hash = ( ) : { interest : number ; sticky : number } => {
166182 return {
167183 interest : hashInterest ( this . interest ) ,
184+ sticky : hashSticky ( this . sticky ) ,
168185 } ;
169186 } ;
170187}
@@ -178,6 +195,7 @@ function clusterStreamNames({
178195} ) {
179196 return {
180197 interest : `cluster/${ clusterName } /${ id } /interest` ,
198+ sticky : `cluster/${ clusterName } /${ id } /sticky` ,
181199 } ;
182200}
183201
@@ -207,6 +225,7 @@ export async function createClusterPersistServer({
207225
208226export interface ClusterStreams {
209227 interest : DStream < InterestUpdate > ;
228+ sticky : DStream < StickyUpdate > ;
210229}
211230
212231export async function clusterStreams ( {
@@ -233,21 +252,27 @@ export async function clusterStreams({
233252 name : names . interest ,
234253 ...opts ,
235254 } ) ;
255+ const sticky = await client . sync . dstream < StickyUpdate > ( {
256+ noInventory : true ,
257+ name : names . sticky ,
258+ ...opts ,
259+ } ) ;
236260 logger . debug ( "clusterStreams: got them" , { clusterName } ) ;
237- return { interest } ;
261+ return { interest, sticky } ;
238262}
239263
240264// Periodically delete not-necessary updates from the interest stream
241265export async function trimClusterStreams (
242266 streams : ClusterStreams ,
243267 data : {
244268 interest : Patterns < { [ queue : string ] : Set < string > } > ;
269+ sticky : { [ pattern : string ] : { [ subject : string ] : string } } ;
245270 links : { interest : Patterns < { [ queue : string ] : Set < string > } > } [ ] ;
246271 } ,
247272 // don't delete anything that isn't at lest minAge ms old.
248273 minAge : number ,
249- ) : Promise < { seqsInterest : number [ ] } > {
250- const { interest } = streams ;
274+ ) : Promise < { seqsInterest : number [ ] ; seqsSticky : number [ ] } > {
275+ const { interest, sticky } = streams ;
251276 // First deal with interst
252277 // we iterate over the interest stream checking for subjects
253278 // with no current interest at all; in such cases it is safe
@@ -275,7 +300,45 @@ export async function trimClusterStreams(
275300 logger . debug ( "trimClusterStream: successfully trimmed interest" , { seqs } ) ;
276301 }
277302
278- return { seqsInterest : seqs } ;
303+ // Next deal with sticky -- trim ones where the pattern is no longer of interest.
304+ // There could be other reasons to trim but it gets much trickier. This one is more
305+ // obvious, except we have to check for any interest in the whole cluster, not
306+ // just this node.
307+ const seqs2 : number [ ] = [ ] ;
308+ function noInterest ( pattern : string ) {
309+ if ( data . interest . hasPattern ( pattern ) ) {
310+ return false ;
311+ }
312+ for ( const link of data . links ) {
313+ if ( link . interest . hasPattern ( pattern ) ) {
314+ return false ;
315+ }
316+ }
317+ // nobody cares
318+ return true ;
319+ }
320+ for ( let n = 0 ; n < sticky . length ; n ++ ) {
321+ const time = sticky . time ( n ) ;
322+ if ( time == null ) continue ;
323+ if ( now - time . valueOf ( ) <= minAge ) {
324+ break ;
325+ }
326+ const update = sticky . get ( n ) as StickyUpdate ;
327+ if ( noInterest ( update . pattern ) ) {
328+ const seq = sticky . seq ( n ) ;
329+ if ( seq != null ) {
330+ seqs2 . push ( seq ) ;
331+ }
332+ }
333+ }
334+ if ( seqs2 . length > 0 ) {
335+ // [ ] todo -- add to interest.delete a version where it takes an array of sequence numbers
336+ logger . debug ( "trimClusterStream: trimming sticky" , { seqs2 } ) ;
337+ await sticky . delete ( { seqs : seqs2 } ) ;
338+ logger . debug ( "trimClusterStream: successfully trimmed sticky" , { seqs2 } ) ;
339+ }
340+
341+ return { seqsInterest : seqs , seqsSticky : seqs2 } ;
279342}
280343
281344function hashSet ( X : Set < string > ) : number {
@@ -299,3 +362,15 @@ export function hashInterest(
299362) : number {
300363 return interest . hash ( hashInterestValue ) ;
301364}
365+
366+ export function hashSticky ( sticky : Sticky ) : number {
367+ let h = 0 ;
368+ for ( const pattern in sticky ) {
369+ h += hash_string ( pattern ) ;
370+ const x = sticky [ pattern ] ;
371+ for ( const subject in x ) {
372+ h += hash_string ( x [ subject ] ) ;
373+ }
374+ }
375+ return h ;
376+ }
0 commit comments