1+ const noop = ( ) => { /* noop */ }
2+
13export default function Subscribe ( postgres , options ) {
2- const listeners = new Map ( )
4+ const subscribers = new Map ( )
5+ , slot = 'postgresjs_' + Math . random ( ) . toString ( 36 ) . slice ( 2 )
6+ , state = { }
37
48 let connection
5-
6- return async function subscribe ( event , fn ) {
7- event = parseEvent ( event )
8-
9- options . max = 1
10- options . onclose = onclose
11- options . fetch_types = false
12- options . connection = {
9+ , stream
10+ , ended = false
11+
12+ const sql = subscribe . sql = postgres ( {
13+ ...options ,
14+ max : 1 ,
15+ fetch_types : false ,
16+ idle_timeout : null ,
17+ max_lifetime : null ,
18+ connection : {
1319 ...options . connection ,
1420 replication : 'database'
15- }
21+ } ,
22+ onclose : async function ( ) {
23+ if ( ended )
24+ return
25+ stream = null
26+ state . pid = state . secret = undefined
27+ ! ended && connected ( await init ( sql , slot , options . publications ) )
28+ subscribers . forEach ( event => event . forEach ( ( { onsubscribe } ) => onsubscribe ( ) ) )
29+ } ,
30+ no_subscribe : true
31+ } )
1632
17- let stream
18- , ended = false
33+ const end = sql . end
34+ , close = sql . close
1935
20- const sql = postgres ( options )
21- , slot = 'postgresjs_' + Math . random ( ) . toString ( 36 ) . slice ( 2 )
22- , end = sql . end
36+ sql . end = async ( ) => {
37+ ended = true
38+ stream && ( await new Promise ( r => ( stream . once ( 'end' , r ) , stream . end ( ) ) ) )
39+ return end ( )
40+ }
2341
24- sql . end = async ( ) => {
25- ended = true
26- stream && ( await new Promise ( r => ( stream . once ( 'end' , r ) , stream . end ( ) ) ) )
27- return end ( )
28- }
42+ sql . close = async ( ) => {
43+ stream && ( await new Promise ( r => ( stream . once ( 'end' , r ) , stream . end ( ) ) ) )
44+ return close ( )
45+ }
46+
47+ return subscribe
2948
30- ! connection && ( subscribe . sql = sql , connection = init ( sql , slot , options . publications ) )
49+ async function subscribe ( event , fn , onsubscribe = noop ) {
50+ event = parseEvent ( event )
3151
32- const fns = listeners . has ( event )
33- ? listeners . get ( event ) . add ( fn )
34- : listeners . set ( event , new Set ( [ fn ] ) ) . get ( event )
52+ if ( ! connection )
53+ connection = init ( sql , slot , options . publications )
54+
55+ const subscriber = { fn, onsubscribe }
56+ const fns = subscribers . has ( event )
57+ ? subscribers . get ( event ) . add ( subscriber )
58+ : subscribers . set ( event , new Set ( [ subscriber ] ) ) . get ( event )
3559
3660 const unsubscribe = ( ) => {
37- fns . delete ( fn )
38- fns . size === 0 && listeners . delete ( event )
61+ fns . delete ( subscriber )
62+ fns . size === 0 && subscribers . delete ( event )
3963 }
4064
41- return connection . then ( x => ( stream = x , { unsubscribe } ) )
65+ return connection . then ( x => {
66+ connected ( x )
67+ onsubscribe ( )
68+ return { unsubscribe, state, sql }
69+ } )
70+ }
4271
43- async function onclose ( ) {
44- stream = null
45- ! ended && ( stream = await init ( sql , slot , options . publications ) )
46- }
72+ function connected ( x ) {
73+ stream = x . stream
74+ state . pid = x . state . pid
75+ state . secret = x . state . secret
4776 }
4877
4978 async function init ( sql , slot , publications ) {
5079 if ( ! publications )
5180 throw new Error ( 'Missing publication names' )
5281
53- const [ x ] = await sql . unsafe (
82+ const xs = await sql . unsafe (
5483 `CREATE_REPLICATION_SLOT ${ slot } TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT`
5584 )
5685
86+ const [ x ] = xs
87+
5788 const stream = await sql . unsafe (
5889 `START_REPLICATION SLOT ${ slot } LOGICAL ${
5990 x . consistent_point
@@ -65,12 +96,10 @@ export default function Subscribe(postgres, options) {
6596 }
6697
6798 stream . on ( 'data' , data )
68- stream . on ( 'error' , ( error ) => {
69- console . error ( 'Logical Replication Error - Reconnecting' , error ) // eslint-disable-line
70- sql . end ( )
71- } )
99+ stream . on ( 'error' , sql . close )
100+ stream . on ( 'close' , sql . close )
72101
73- return stream
102+ return { stream, state : xs . state }
74103
75104 function data ( x ) {
76105 if ( x [ 0 ] === 0x77 )
@@ -99,7 +128,7 @@ export default function Subscribe(postgres, options) {
99128 }
100129
101130 function call ( x , a , b ) {
102- listeners . has ( x ) && listeners . get ( x ) . forEach ( fn => fn ( a , b , x ) )
131+ subscribers . has ( x ) && subscribers . get ( x ) . forEach ( ( { fn } ) => fn ( a , b , x ) )
103132 }
104133}
105134
0 commit comments