diff --git a/.changeset/angry-clocks-sniff.md b/.changeset/angry-clocks-sniff.md new file mode 100644 index 0000000000..5942a1ec80 --- /dev/null +++ b/.changeset/angry-clocks-sniff.md @@ -0,0 +1,51 @@ +--- +'@graphql-yoga/subscription': minor +--- + +Introduce new object based call signature for the `PubSub.subscribe` method. + +```ts +import { createPubSub } from 'graphql-yoga' +import { SlidingBuffer } from '@repeaterjs/repeater' + +const pubSub = createPubSub() + +pubSub.subscribe({ + topic: "userChanged", + id: "1", + buffer: new SlidingBuffer() +}) +``` + +Introduce new object based call signature for the `PubSub.publish` method. + +```ts +import { createPubSub } from 'graphql-yoga' +import { SlidingBuffer } from '@repeaterjs/repeater' + +const pubSub = createPubSub() + +pubSub.publish({ + topic: "userChanged", + id: "1", +}) +``` + +For now, both the old and new call signatures will be supported, but we might consider only supporting the new call signature in a new major release. + +Support providing a `RepeaterBuffer` to the `PubSub.subscribe` method, by using the new object based call signature. + +```ts +import { createPubSub } from 'graphql-yoga' +import { SlidingBuffer } from '@repeaterjs/repeater' + +const pubSub = createPubSub() + +pubSub.subscribe({ + topic: "userChanged", + id: "1", + buffer: new SlidingBuffer(1_000) +}) +``` + +Learn more about buffers on the [Repeater.js website](https://repeater.js.org/docs/safety#3-buffering-and-dropping-values). diff --git a/packages/subscription/src/create-pub-sub.ts b/packages/subscription/src/create-pub-sub.ts index 447d093a5a..10ec7e76cb 100644 --- a/packages/subscription/src/create-pub-sub.ts +++ b/packages/subscription/src/create-pub-sub.ts @@ -1,5 +1,5 @@ import type { TypedEventTarget } from '@graphql-yoga/typed-event-target'; -import { Repeater } from '@repeaterjs/repeater'; +import { Repeater, type RepeaterBuffer } from '@repeaterjs/repeater'; import { CustomEvent } from '@whatwg-node/events'; type PubSubPublishArgsByKey = { @@ -38,16 +38,42 @@ export type PubSub = { * Publish a value for a given topic. */ publish>( - routingKey: TKey, - ...args: TPubSubPublishArgsByKey[TKey] + ...args: + | [ + args: { + topic: TKey; + } & (TPubSubPublishArgsByKey[TKey][1] extends undefined + ? { + id?: void; + payload: TPubSubPublishArgsByKey[TKey][0]; + } + : { + id: TPubSubPublishArgsByKey[TKey][0]; + payload: TPubSubPublishArgsByKey[TKey][1]; + }), + ] + | [routingKey: TKey, ...args: TPubSubPublishArgsByKey[TKey]] ): void; /** * Subscribe to a topic. */ subscribe>( - ...[routingKey, id]: TPubSubPublishArgsByKey[TKey][1] extends undefined - ? [TKey] - : [TKey, TPubSubPublishArgsByKey[TKey][0]] + ...args: + | (TPubSubPublishArgsByKey[TKey][1] extends undefined + ? [key: TKey] + : [key: TKey, id: TPubSubPublishArgsByKey[TKey][0]]) + | [ + args: { + topic: string; + buffer?: RepeaterBuffer | undefined; + } & (TPubSubPublishArgsByKey[TKey][1] extends undefined + ? { + id?: void; + } + : { + id: TPubSubPublishArgsByKey[TKey][0]; + }), + ] ): Repeater< TPubSubPublishArgsByKey[TKey][1] extends undefined ? MapToNull @@ -64,41 +90,86 @@ export const createPubSub = ); - return { - publish>( - routingKey: TKey, - ...args: TPubSubPublishArgsByKey[TKey] - ) { - const payload = args[1] ?? args[0] ?? null; - const topic = args[1] === undefined ? routingKey : `${routingKey}:${args[0] as number}`; + function subscribe>( + ...args: + | (TPubSubPublishArgsByKey[TKey][1] extends undefined + ? [key: TKey] + : [key: TKey, id: TPubSubPublishArgsByKey[TKey][0]]) + | [ + args: { + topic: string; + buffer?: RepeaterBuffer | undefined; + } & (TPubSubPublishArgsByKey[TKey][1] extends undefined + ? { + id?: void; + } + : { + id: TPubSubPublishArgsByKey[TKey][0]; + }), + ] + ): Repeater< + TPubSubPublishArgsByKey[TKey][1] extends undefined + ? TPubSubPublishArgsByKey[TKey][0] + : TPubSubPublishArgsByKey[TKey][1] + > { + let topic: string; + let buffer: RepeaterBuffer | undefined; + if (typeof args[0] === 'string') { + topic = args[1] === undefined ? args[0] : `${args[0]}:${args[1]}`; + } else { + topic = args[0].id === undefined ? args[0].topic : `${args[0].topic}:${args[0].id}`; + buffer = args[0].buffer; + } - const event: PubSubEvent = new CustomEvent(topic, { - detail: payload, + return new Repeater(function subscriptionRepeater(next, stop) { + stop.then(function subscriptionRepeaterStopHandler() { + target.removeEventListener(topic, pubsubEventListener); }); - target.dispatchEvent(event); - }, - subscribe>( - ...[routingKey, id]: TPubSubPublishArgsByKey[TKey][1] extends undefined - ? [TKey] - : [TKey, TPubSubPublishArgsByKey[TKey][0]] - ): Repeater< - TPubSubPublishArgsByKey[TKey][1] extends undefined - ? TPubSubPublishArgsByKey[TKey][0] - : TPubSubPublishArgsByKey[TKey][1] - > { - const topic = id === undefined ? routingKey : `${routingKey}:${id as number}`; - return new Repeater(function subscriptionRepeater(next, stop) { - stop.then(function subscriptionRepeaterStopHandler() { - target.removeEventListener(topic, pubsubEventListener); - }); + target.addEventListener(topic, pubsubEventListener); - target.addEventListener(topic, pubsubEventListener); + function pubsubEventListener(event: PubSubEvent) { + next(event.detail); + } + }, buffer); + } - function pubsubEventListener(event: PubSubEvent) { - next(event.detail); - } - }); - }, + function publish>( + ...args: + | [ + args: { + topic: TKey; + } & (TPubSubPublishArgsByKey[TKey][1] extends undefined + ? { + id?: void; + payload: TPubSubPublishArgsByKey[TKey][0]; + } + : { + id: TPubSubPublishArgsByKey[TKey][0]; + payload: TPubSubPublishArgsByKey[TKey][1]; + }), + ] + | [routingKey: TKey, ...args: TPubSubPublishArgsByKey[TKey]] + ): void { + let payload: unknown; + let topic: string; + if (typeof args[0] === 'string') { + payload = args[2] ?? args[1] ?? null; + topic = args[2] === undefined ? args[0] : `${args[0]}:${args[1]}`; + } else { + const arg = args[0]; + payload = arg.payload; + topic = arg.id === undefined ? arg.topic : `${arg.topic}:${arg.id}`; + } + + const event: PubSubEvent = new CustomEvent(topic, { + detail: payload, + }); + target.dispatchEvent(event); + } + + return { + publish, + subscribe, }; }; diff --git a/packages/subscription/src/createPubSub.spec.ts b/packages/subscription/src/createPubSub.spec.ts index c7b603ab0a..2ba38124ae 100644 --- a/packages/subscription/src/createPubSub.spec.ts +++ b/packages/subscription/src/createPubSub.spec.ts @@ -149,5 +149,49 @@ describe('createPubSub', () => { const result = await allValues; expect(result).toEqual([null, null, null]); }); + it('subscribe to object based API', async () => { + const pubSub = createPubSub<{ + a: [number]; + }>({ + eventTarget: createEventTarget(), + }); + + const sub = pubSub.subscribe({ + topic: 'a', + }); + const allValues = collectAsyncIterableValues(sub); + pubSub.publish('a', 1); + pubSub.publish('a', 2); + pubSub.publish('a', 3); + + setImmediate(() => { + sub.return(); + }); + + const result = await allValues; + expect(result).toEqual([1, 2, 3]); + }); + it('subscribe to fine-grained topic with object based API', async () => { + const pubSub = createPubSub<{ + a: [id: string, payload: number]; + }>({ + eventTarget: createEventTarget(), + }); + const id1 = '1'; + const sub1 = pubSub.subscribe({ + topic: 'a', + id: id1, + }); + const allValues1 = collectAsyncIterableValues(sub1); + pubSub.publish('a', id1, 1); + pubSub.publish('a', id1, 2); + pubSub.publish('a', id1, 3); + setImmediate(() => { + sub1.return(); + }); + + const result1 = await allValues1; + expect(result1).toEqual([1, 2, 3]); + }); }); }); diff --git a/website/src/pages/docs/features/subscriptions.mdx b/website/src/pages/docs/features/subscriptions.mdx index 201dfec836..bbcdc837fe 100644 --- a/website/src/pages/docs/features/subscriptions.mdx +++ b/website/src/pages/docs/features/subscriptions.mdx @@ -314,14 +314,14 @@ const yoga = createYoga({ Subscription: { randomNumber: { // subscribe to the randomNumber event - subscribe: () => pubSub.subscribe('randomNumber'), + subscribe: () => pubSub.subscribe({ topic: 'randomNumber' }), resolve: payload => payload } }, Mutation: { broadcastRandomNumber: (_, args) => { // publish a random number - pubSub.publish('randomNumber', Math.random()) + pubSub.publish({ topic: 'randomNumber', payload: Math.random() }) } } } @@ -344,16 +344,18 @@ const pubSub = createPubSub<{ randomNumber: [randomNumber: number] }>() -pubsub.subscribe('randomNumber') +pubsub.subscribe({ + topic: 'randomNumber' +}) // This is now type-safe. -pubSub.publish('randomNumber', 1) +pubSub.publish({ topic: 'randomNumber', payload: 1 }) // This causes a TypeScript error. -pubSub.publish('randomNumber') +pubSub.publish({ topic: 'randomNumber' }) // This causes a TypeScript error. -pubSub.publish('event does not exist') +pubSub.publish({ topic: 'event does not exist' }) ``` You can subscribe to a specific topic using `pubSub.subscribe`. @@ -365,7 +367,9 @@ const pubSub = createPubSub<{ // Usage outside a GraphQL subscribe function async function subscribe() { - const eventSource = pubSub.subscribe('randomNumber') + const eventSource = pubSub.subscribe({ + topic: 'randomNumber' + }) for await (const value of eventSource) { console.log(value) @@ -376,7 +380,7 @@ async function subscribe() { subscribe() -pubSub.publish('randomNumber', 3) +pubSub.publish({ topic: 'randomNumber', payload: 3 }) ``` You can publish a value using `pubSub.publish`. @@ -386,7 +390,7 @@ const pubSub = createPubSub<{ randomNumber: [randomNumber: number] }>() -pubSub.publish('randomNumber', 3) +pubSub.publish({ topic: 'randomNumber', payload: 3 }) ``` #### Topic Configuration Variants @@ -403,9 +407,9 @@ const pubSub = createPubSub<{ 'event:payload:obj': [payload: { foo: number }] }>() -pubSub.publish('event:without:payload') -pubSub.publish('event:payload:number', 12) -pubSub.publish('event:payload:obj', { foo: 1 }) +pubSub.publish({ topic: 'event:without:payload' }) +pubSub.publish({ topic: 'event:payload:number', payload: 12 }) +pubSub.publish({ topic: 'event:payload:obj', payload: { foo: 1 } }) ``` #### Topic with Dynamic ID @@ -422,11 +426,11 @@ const userId1 = '420' const userId2 = '69' // the userId argument is enforced by the TypeScript compiler. -pubSub.subscribe('user:followerCount', userId1) -pubSub.subscribe('user:followerCount', userId2) +pubSub.subscribe({ topic: 'user:followerCount', id: userId1 }) +pubSub.subscribe({ topic: 'user:followerCount', id: userId2 }) -pubSub.publish('user:followerCount', userId1, { followerCount: 30 }) -pubSub.publish('user:followerCount', userId2, { followerCount: 12 }) +pubSub.publish({ topic: 'user:followerCount', id: userId1, payload: { followerCount: 30 } }) +pubSub.publish({ topic: 'user:followerCount', id: userId2, payload: { followerCount: 12 } }) ``` ### Distributed Pub/Sub for Production @@ -523,7 +527,7 @@ const pubSub = createPubSub<{ }>() const source = pipe( - pubSub.subscribe('randomNumber'), + pubSub.subscribe({ topic: 'randomNumber' }), map(publishedNumber => publishedNumber * 2), filter(multipliedNumber => multipliedNumber < 10) ) @@ -534,10 +538,10 @@ const source = pipe( } })() -pubSub.publish('randomNumber', 1) // logs 2 -pubSub.publish('randomNumber', 2) // logs 4 -pubSub.publish('randomNumber', 5) // filtered out -pubSub.publish('randomNumber', 3) // logs 6 +pubSub.publish({ topic: 'randomNumber', payload: 1 }) // logs 2 +pubSub.publish({ topic: 'randomNumber', payload: 2 }) // logs 4 +pubSub.publish({ topic: 'randomNumber', payload: 5 }) // filtered out +pubSub.publish({ topic: 'randomNumber', payload: 3 }) // logs 6 source.return() ``` @@ -571,7 +575,7 @@ const yoga = createYoga({ // subscribe to the randomNumber event subscribe: (_, args) => pipe( - pubSub.subscribe('randomNumber'), + pubSub.subscribe({ topic: 'randomNumber' }), map(publishedNumber => publishedNumber * args.multiplyBy), filter(multipliedNumber => multipliedNumber < args.lessThan) ), @@ -581,7 +585,7 @@ const yoga = createYoga({ Mutation: { broadcastRandomNumber: (_, args) => { // publish a random number - pubSub.publish('randomNumber', Math.random()) + pubSub.publish({ topic: 'randomNumber', payload: Math.random() }) } } } @@ -648,7 +652,7 @@ const yoga = createYoga({ // upon initiating the subscription undefined, // event stream for future updates - pubSub.subscribe('globalCounter:change') + pubSub.subscribe({ topic: 'globalCounter:change' }) ]), // map all stream values to the latest globalCounter map(() => globalCounter) @@ -660,7 +664,7 @@ const yoga = createYoga({ incrementGlobalCounter: () => { globalCounter = globalCounter + 1 // publish a global counter increment event - pubSub.publish('globalCounter:change') + pubSub.publish({ topic: 'globalCounter:change' }) return globalCounter } } @@ -728,8 +732,8 @@ const yoga = createYoga({ subscribe: () => pipe( Repeater.merge([ - pubSub.subscribe('userLoginChanged'), - pubSub.subscribe('userDeleted') + pubSub.subscribe({ topic: 'userLoginChanged' }), + pubSub.subscribe({ topic: 'userDeleted' }) ]), // map all stream values to the latest user map(() => user) @@ -740,7 +744,7 @@ const yoga = createYoga({ Mutation: { deleteUser() { user = null - pubSub.publish('userDeleted') + pubSub.publish({ topic: 'userDeleted' }) return true }, updateUserLogin(_, args) { @@ -748,7 +752,7 @@ const yoga = createYoga({ return false } user.login = args.newLogin - pubSub.publish('userLoginChanged') + pubSub.publish({ topic: 'userLoginChanged' }) return true } }