Skip to content

Commit 239677a

Browse files
Async compatible Hive PubSub, Redis and NATS PubSub, EDFS (#1395)
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 5e94f24 commit 239677a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1914
-266
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
'@graphql-hive/pubsub': patch
3+
---
4+
5+
dependencies updates:
6+
7+
- Added dependency [`@whatwg-node/promise-helpers@^1.3.0` ↗︎](https://www.npmjs.com/package/@whatwg-node/promise-helpers/v/1.3.0) (to `dependencies`)
8+
- Added dependency [`@nats-io/nats-core@^3` ↗︎](https://www.npmjs.com/package/@nats-io/nats-core/v/3.0.0) (to `peerDependencies`)
9+
- Added dependency [`ioredis@^5` ↗︎](https://www.npmjs.com/package/ioredis/v/5.0.0) (to `peerDependencies`)

.changeset/hip-suits-dress.md

Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
---
2+
'@graphql-hive/pubsub': major
3+
---
4+
5+
Complete API redesign with async support and distributed Redis PubSub
6+
7+
## Redesigned interface
8+
9+
```ts
10+
import type { DisposableSymbols } from '@whatwg-node/disposablestack';
11+
import type { MaybePromise } from '@whatwg-node/promise-helpers';
12+
13+
export type TopicDataMap = { [topic: string]: any /* data */ };
14+
15+
export type PubSubListener<
16+
Data extends TopicDataMap,
17+
Topic extends keyof Data,
18+
> = (data: Data[Topic]) => void;
19+
20+
export interface PubSub<M extends TopicDataMap = TopicDataMap> {
21+
/**
22+
* Publish {@link data} for a {@link topic}.
23+
* @returns `void` or a `Promise` that resolves when the data has been successfully published
24+
*/
25+
publish<Topic extends keyof M>(
26+
topic: Topic,
27+
data: M[Topic],
28+
): MaybePromise<void>;
29+
/**
30+
* A distinct list of all topics that are currently subscribed to.
31+
* Can be a promise to accomodate distributed systems where subscribers exist on other
32+
* locations and we need to know about all of them.
33+
*/
34+
subscribedTopics(): MaybePromise<Iterable<keyof M>>;
35+
/**
36+
* Subscribe and listen to a {@link topic} receiving its data.
37+
*
38+
* If the {@link listener} is provided, it will be called whenever data is emitted for the {@link topic},
39+
*
40+
* @returns an unsubscribe function or a `Promise<unsubscribe function>` that resolves when the subscription is successfully established. the unsubscribe function returns `void` or a `Promise` that resolves on successful unsubscribe and subscription cleanup
41+
*
42+
* If the {@link listener} is not provided,
43+
*
44+
* @returns an `AsyncIterable` that yields data for the given {@link topic}
45+
*/
46+
subscribe<Topic extends keyof M>(topic: Topic): AsyncIterable<M[Topic]>;
47+
subscribe<Topic extends keyof M>(
48+
topic: Topic,
49+
listener: PubSubListener<M, Topic>,
50+
): MaybePromise<() => MaybePromise<void>>;
51+
/**
52+
* Closes active subscriptions and disposes of all resources. Publishing and subscribing after disposal
53+
* is not possible and will throw an error if attempted.
54+
*/
55+
dispose(): MaybePromise<void>;
56+
/** @see {@link dispose} */
57+
[DisposableSymbols.asyncDispose](): Promise<void>;
58+
}
59+
```
60+
61+
## New `NATSPubSub` for a NATS-powered pubsub
62+
63+
```sh
64+
npm i @nats-io/transport-node
65+
```
66+
67+
```ts filename="gateway.config.ts"
68+
import { defineConfig } from '@graphql-hive/gateway';
69+
import { NATSPubSub } from '@graphql-hive/pubsub/nats';
70+
import { connect } from '@nats-io/transport-node';
71+
72+
export const gatewayConfig = defineConfig({
73+
maskedErrors: false,
74+
pubsub: new NATSPubSub(
75+
await connect(),
76+
{
77+
// we make sure to use the same prefix for all gateways to share the same channels and pubsub.
78+
// meaning, all gateways using this channel prefix will receive and publish to the same topics
79+
subjectPrefix: 'my-app',
80+
},
81+
),
82+
});
83+
```
84+
85+
## New `RedisPubSub` for a Redis-powered pubsub
86+
87+
```sh
88+
npm i ioredis
89+
```
90+
91+
```ts
92+
import { RedisPubSub } from '@graphql-hive/pubsub/redis';
93+
import Redis from 'ioredis';
94+
95+
/**
96+
* When a Redis connection enters "subscriber mode" (after calling SUBSCRIBE), it can only execute
97+
* subscriber commands (SUBSCRIBE, UNSUBSCRIBE, etc.). Meaning, it cannot execute other commands like PUBLISH.
98+
* To avoid this, we use two separate Redis clients: one for publishing and one for subscribing.
99+
*/
100+
const pub = new Redis();
101+
const sub = new Redis();
102+
103+
const pubsub = new RedisPubSub(
104+
{ pub, sub },
105+
// if the chanel prefix is the shared between services, the topics will be shared as well
106+
// this means that if you have multiple services using the same channel prefix, they will
107+
// receive each other's messages
108+
{ channelPrefix: 'my-app' }
109+
);
110+
```
111+
112+
## Migrating
113+
114+
The main migration effort involves:
115+
1. Updating import statements
116+
2. Adding `await` to async operations
117+
3. Replacing subscription ID pattern with unsubscribe functions
118+
4. Replacing `asyncIterator()` with overloaded `subscribe()`
119+
5. Choosing between `MemPubSub` and `RedisPubSub` implementations
120+
6. Using the `PubSub` interface instead of `HivePubSub`
121+
122+
Before:
123+
124+
```typescript
125+
import { PubSub, HivePubSub } from '@graphql-hive/pubsub';
126+
127+
interface TopicMap {
128+
userCreated: { id: string; name: string };
129+
orderPlaced: { orderId: string; amount: number };
130+
}
131+
132+
const pubsub: HivePubSub<TopicMap> = new PubSub();
133+
134+
// Subscribe
135+
const subId = pubsub.subscribe('userCreated', (user) => {
136+
console.log('User created:', user.name);
137+
});
138+
139+
// Publish
140+
pubsub.publish('userCreated', { id: '1', name: 'John' });
141+
142+
// Async iteration
143+
(async () => {
144+
for await (const order of pubsub.asyncIterator('orderPlaced')) {
145+
console.log('Order placed:', order.orderId);
146+
}
147+
})();
148+
149+
// Get topics
150+
const topics = pubsub.getEventNames();
151+
152+
// Unsubcribe
153+
pubsub.unsubscribe(subId);
154+
155+
// Dispose/destroy the pubsub
156+
pubsub.dispose();
157+
```
158+
159+
After:
160+
161+
```typescript
162+
import { MemPubSub, PubSub } from '@graphql-hive/pubsub';
163+
164+
interface TopicMap {
165+
userCreated: { id: string; name: string };
166+
orderPlaced: { orderId: string; amount: number };
167+
}
168+
169+
const pubsub: PubSub<TopicMap> = new MemPubSub();
170+
171+
// Subscribe
172+
const unsubscribe = await pubsub.subscribe('userCreated', (user) => {
173+
console.log('User created:', user.name);
174+
});
175+
176+
// Publish
177+
await pubsub.publish('userCreated', { id: '1', name: 'John' });
178+
179+
// Async iteration
180+
(async () => {
181+
for await (const order of pubsub.subscribe('orderPlaced')) {
182+
console.log('Order placed:', order.orderId);
183+
}
184+
})();
185+
186+
// Get topics
187+
const topics = await pubsub.subscribedTopics();
188+
189+
// Unsubscribe
190+
await unsubscribe();
191+
192+
// Dispose/destroy the pubsub
193+
await pubsub.dispose();
194+
```
195+
196+
### Interface renamed from `HivePubSub` to just `PubSub`
197+
198+
```diff
199+
- import { HivePubSub } from '@graphql-hive/pubsub';
200+
+ import { PubSub } from '@graphql-hive/pubsub';
201+
```
202+
203+
### `subscribedTopics()` method signature change
204+
205+
This method is now required and supports async operations.
206+
207+
```diff
208+
- subscribedTopics?(): Iterable; // Optional
209+
+ subscribedTopics(): MaybePromise<Iterable>; // Required, supports async
210+
```
211+
212+
### `publish()` method signature change
213+
214+
Publishing can now be async and may return a promise.
215+
216+
```diff
217+
- publish<Topic extends keyof Data>(topic: Topic, data: Data[Topic]): void;
218+
+ publish<Topic extends keyof M>(topic: Topic, data: M[Topic]): MaybePromise<void>;
219+
```
220+
221+
Migrating existing code:
222+
223+
```diff
224+
- pubsub.publish('topic', data);
225+
+ await pubsub.publish('topic', data);
226+
```
227+
228+
### `subscribe()` method signature change
229+
230+
Subscribe now returns an unsubscribe function instead of a subscription ID.
231+
232+
```diff
233+
subscribe<Topic extends keyof Data>(
234+
topic: Topic,
235+
listener: PubSubListener<Data, Topic>
236+
- ): number; // Returns subscription ID
237+
+ ): MaybePromise<() => MaybePromise<void>>; // Returns unsubscribe function
238+
```
239+
240+
Migrating existing code:
241+
242+
```diff
243+
- const subId = pubsub.subscribe('topic', (data) => {
244+
- console.log(data);
245+
- });
246+
- pubsub.unsubscribe(subId);
247+
+ const unsubscribe = await pubsub.subscribe('topic', (data) => {
248+
+ console.log(data);
249+
+ });
250+
+ await unsubscribe();
251+
```
252+
253+
### `dispose()` method signature change
254+
255+
Disposal is now required and supports async operations.
256+
257+
```diff
258+
- dispose?(): void; // Optional
259+
+ dispose(): MaybePromise<void>; // Required, supports async
260+
```
261+
262+
Migrating existing code:
263+
264+
```diff
265+
- pubsub.dispose();
266+
+ await pubsub.dispose();
267+
```
268+
269+
### Removed `getEventNames()` method
270+
271+
This deprecated method was removed. Use `subscribedTopics()` instead.
272+
273+
```diff
274+
- getEventNames(): Iterable<keyof Data>;
275+
```
276+
277+
Migrating existing code:
278+
279+
```diff
280+
- const topics = pubsub.getEventNames();
281+
+ const topics = await pubsub.subscribedTopics();
282+
```
283+
284+
### Removed `unsubscribe()` method
285+
286+
The centralized unsubscribe method was removed. Each subscription now returns its own unsubscribe function.
287+
288+
```diff
289+
- unsubscribe(subId: number): void;
290+
```
291+
292+
Migrating existing code by using the unsubscribe function returned by `subscribe()` instead:
293+
294+
```diff
295+
- const subId = pubsub.subscribe('topic', listener);
296+
- pubsub.unsubscribe(subId);
297+
298+
+ const unsubscribe = await pubsub.subscribe('topic', listener);
299+
+ await unsubscribe();
300+
```
301+
302+
### Removed `asyncIterator()` Method
303+
304+
The separate async iterator method was removed. Call `subscribe()` without a listener to get an async iterable.
305+
306+
```diff
307+
- asyncIterator<Topic extends keyof Data>(topic: Topic): AsyncIterable<Data[Topic]>;
308+
```
309+
310+
Migrating existing code:
311+
312+
```diff
313+
- for await (const data of pubsub.asyncIterator('topic')) {
314+
+ for await (const data of pubsub.subscribe('topic')) {
315+
console.log(data);
316+
}
317+
```
318+
319+
### `MemPubSub` is the in-memory pubsub implementation
320+
321+
The generic `PubSub` class was replaced with implementation specific `MemPubSub` for an in-memory pubsub.
322+
323+
```diff
324+
- import { PubSub } from '@graphql-hive/pubsub';
325+
- const pubsub = new PubSub();
326+
+ import { MemPubSub } from '@graphql-hive/pubsub';
327+
+ const pubsub = new MemPubSub();
328+
```

.changeset/twelve-pets-run.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@graphql-hive/pubsub': patch
3+
---
4+
5+
Export TopicDataMap type for easier external implementations

.github/workflows/examples.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ jobs:
5454
- subscriptions-with-transforms
5555
- type-merging-batching
5656
- operation-field-permissions
57+
- operation-field-permissions
58+
# TODO: support containers before enabling (we use redis image in e2e)
59+
# - distributed-subscriptions-webhooks
5760
name: Convert ${{matrix.e2e}}
5861
runs-on: ubuntu-latest
5962
steps:

.prettierignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
.yarn/*
2+
yarn.lock
23
!.yarn/custom-plugins
34
Dockerfile
45
packages/runtime/src/landing-page.generated.ts
56
__generated__
7+
__snapshots__
68
.changeset/*
79
!.changeset/README.md
810
!.changeset/config.json
@@ -15,3 +17,5 @@ __generated__
1517
/e2e/load-on-init/malformed.graphql
1618
CHANGELOG.md
1719
/internal/heapsnapshot/dist/
20+
*.tar.gz
21+
*.tgz

0 commit comments

Comments
 (0)