Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions .changeset/angry-clocks-sniff.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
---
'@graphql-yoga/subscription': minor
---

Introduce new object based call signature for the `PubSub.subscribe` method.

```ts
import { createPubSub } from 'graphql-yoga'
import { SlidingBuffer } from '@repeaterjs/repeater'

const pubSub = createPubSub()

pubSub.subscribe({
topic: "userChanged",
id: "1",
buffer: new SlidingBuffer()
})
```

Introduce new object based call signature for the `PubSub.publish` method.

```ts
import { createPubSub } from 'graphql-yoga'
import { SlidingBuffer } from '@repeaterjs/repeater'

const pubSub = createPubSub()

pubSub.publish({
topic: "userChanged",
id: "1",
})
```

For now, both the old and new call signatures will be supported, but we might consider only supporting the new call signature in a new major release.

Support providing a `RepeaterBuffer` to the `PubSub.subscribe` method, by using the new object based call signature.

```ts
import { createPubSub } from 'graphql-yoga'
import { SlidingBuffer } from '@repeaterjs/repeater'

const pubSub = createPubSub()

pubSub.subscribe({
topic: "userChanged",
id: "1",
buffer: new SlidingBuffer(1_000)
})
```

Learn more about buffers on the [Repeater.js website](https://repeater.js.org/docs/safety#3-buffering-and-dropping-values).
145 changes: 108 additions & 37 deletions packages/subscription/src/create-pub-sub.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { TypedEventTarget } from '@graphql-yoga/typed-event-target';
import { Repeater } from '@repeaterjs/repeater';
import { Repeater, type RepeaterBuffer } from '@repeaterjs/repeater';
import { CustomEvent } from '@whatwg-node/events';

type PubSubPublishArgsByKey = {
Expand Down Expand Up @@ -38,16 +38,42 @@ export type PubSub<TPubSubPublishArgsByKey extends PubSubPublishArgsByKey> = {
* Publish a value for a given topic.
*/
publish<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
routingKey: TKey,
...args: TPubSubPublishArgsByKey[TKey]
...args:
| [
args: {
topic: TKey;
} & (TPubSubPublishArgsByKey[TKey][1] extends undefined
? {
id?: void;
payload: TPubSubPublishArgsByKey[TKey][0];
}
: {
id: TPubSubPublishArgsByKey[TKey][0];
payload: TPubSubPublishArgsByKey[TKey][1];
}),
]
| [routingKey: TKey, ...args: TPubSubPublishArgsByKey[TKey]]
): void;
/**
* Subscribe to a topic.
*/
subscribe<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
...[routingKey, id]: TPubSubPublishArgsByKey[TKey][1] extends undefined
? [TKey]
: [TKey, TPubSubPublishArgsByKey[TKey][0]]
...args:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fun

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fun or breaking change 😆

| (TPubSubPublishArgsByKey[TKey][1] extends undefined
? [key: TKey]
: [key: TKey, id: TPubSubPublishArgsByKey[TKey][0]])
| [
args: {
topic: string;
buffer?: RepeaterBuffer | undefined;
} & (TPubSubPublishArgsByKey[TKey][1] extends undefined
? {
id?: void;
}
: {
id: TPubSubPublishArgsByKey[TKey][0];
}),
]
): Repeater<
TPubSubPublishArgsByKey[TKey][1] extends undefined
? MapToNull<TPubSubPublishArgsByKey[TKey][0]>
Expand All @@ -64,41 +90,86 @@ export const createPubSub = <TPubSubPublishArgsByKey extends PubSubPublishArgsBy
const target =
config?.eventTarget ?? (new EventTarget() as PubSubEventTarget<TPubSubPublishArgsByKey>);

return {
publish<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
routingKey: TKey,
...args: TPubSubPublishArgsByKey[TKey]
) {
const payload = args[1] ?? args[0] ?? null;
const topic = args[1] === undefined ? routingKey : `${routingKey}:${args[0] as number}`;
function subscribe<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
...args:
| (TPubSubPublishArgsByKey[TKey][1] extends undefined
? [key: TKey]
: [key: TKey, id: TPubSubPublishArgsByKey[TKey][0]])
| [
args: {
topic: string;
buffer?: RepeaterBuffer | undefined;
} & (TPubSubPublishArgsByKey[TKey][1] extends undefined
? {
id?: void;
}
: {
id: TPubSubPublishArgsByKey[TKey][0];
}),
]
): Repeater<
TPubSubPublishArgsByKey[TKey][1] extends undefined
? TPubSubPublishArgsByKey[TKey][0]
: TPubSubPublishArgsByKey[TKey][1]
> {
let topic: string;
let buffer: RepeaterBuffer | undefined;
if (typeof args[0] === 'string') {
topic = args[1] === undefined ? args[0] : `${args[0]}:${args[1]}`;
} else {
topic = args[0].id === undefined ? args[0].topic : `${args[0].topic}:${args[0].id}`;
buffer = args[0].buffer;
}

const event: PubSubEvent<TPubSubPublishArgsByKey, TKey> = new CustomEvent(topic, {
detail: payload,
return new Repeater(function subscriptionRepeater(next, stop) {
stop.then(function subscriptionRepeaterStopHandler() {
target.removeEventListener(topic, pubsubEventListener);
});
target.dispatchEvent(event);
},
subscribe<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
...[routingKey, id]: TPubSubPublishArgsByKey[TKey][1] extends undefined
? [TKey]
: [TKey, TPubSubPublishArgsByKey[TKey][0]]
): Repeater<
TPubSubPublishArgsByKey[TKey][1] extends undefined
? TPubSubPublishArgsByKey[TKey][0]
: TPubSubPublishArgsByKey[TKey][1]
> {
const topic = id === undefined ? routingKey : `${routingKey}:${id as number}`;

return new Repeater(function subscriptionRepeater(next, stop) {
stop.then(function subscriptionRepeaterStopHandler() {
target.removeEventListener(topic, pubsubEventListener);
});
target.addEventListener(topic, pubsubEventListener);

target.addEventListener(topic, pubsubEventListener);
function pubsubEventListener(event: PubSubEvent<TPubSubPublishArgsByKey, TKey>) {
next(event.detail);
}
}, buffer);
}

function pubsubEventListener(event: PubSubEvent<TPubSubPublishArgsByKey, TKey>) {
next(event.detail);
}
});
},
function publish<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
...args:
| [
args: {
topic: TKey;
} & (TPubSubPublishArgsByKey[TKey][1] extends undefined
? {
id?: void;
payload: TPubSubPublishArgsByKey[TKey][0];
}
: {
id: TPubSubPublishArgsByKey[TKey][0];
payload: TPubSubPublishArgsByKey[TKey][1];
}),
]
| [routingKey: TKey, ...args: TPubSubPublishArgsByKey[TKey]]
): void {
let payload: unknown;
let topic: string;
if (typeof args[0] === 'string') {
payload = args[2] ?? args[1] ?? null;
topic = args[2] === undefined ? args[0] : `${args[0]}:${args[1]}`;
} else {
const arg = args[0];
payload = arg.payload;
topic = arg.id === undefined ? arg.topic : `${arg.topic}:${arg.id}`;
}

const event: PubSubEvent<TPubSubPublishArgsByKey, TKey> = new CustomEvent(topic, {
detail: payload,
});
target.dispatchEvent(event);
}

return {
publish,
subscribe,
};
};
44 changes: 44 additions & 0 deletions packages/subscription/src/createPubSub.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,5 +149,49 @@ describe('createPubSub', () => {
const result = await allValues;
expect(result).toEqual([null, null, null]);
});
it('subscribe to object based API', async () => {
const pubSub = createPubSub<{
a: [number];
}>({
eventTarget: createEventTarget(),
});

const sub = pubSub.subscribe({
topic: 'a',
});
const allValues = collectAsyncIterableValues(sub);
pubSub.publish('a', 1);
pubSub.publish('a', 2);
pubSub.publish('a', 3);

setImmediate(() => {
sub.return();
});

const result = await allValues;
expect(result).toEqual([1, 2, 3]);
});
it('subscribe to fine-grained topic with object based API', async () => {
const pubSub = createPubSub<{
a: [id: string, payload: number];
}>({
eventTarget: createEventTarget(),
});
const id1 = '1';
const sub1 = pubSub.subscribe({
topic: 'a',
id: id1,
});
const allValues1 = collectAsyncIterableValues(sub1);
pubSub.publish('a', id1, 1);
pubSub.publish('a', id1, 2);
pubSub.publish('a', id1, 3);
setImmediate(() => {
sub1.return();
});

const result1 = await allValues1;
expect(result1).toEqual([1, 2, 3]);
});
});
});
Loading
Loading