Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
322c1d9
e2e redis pubsub and hack
enisdenjo Aug 19, 2025
1a4dc3c
additional type defs
enisdenjo Aug 19, 2025
fcaca99
more wait ensure
enisdenjo Aug 19, 2025
5f50940
pubsub
enisdenjo Aug 19, 2025
61c1111
new pubsub
enisdenjo Aug 19, 2025
0f1cee2
sub on error too
enisdenjo Aug 19, 2025
e5eec66
fix and thorough tests
enisdenjo Aug 19, 2025
9670b05
explain redis and distribute e2e
enisdenjo Aug 19, 2025
0fba9fe
pubsub major todo
enisdenjo Aug 19, 2025
77ff0e1
refactor and use the new hive pubsub
enisdenjo Aug 19, 2025
3b70b3c
support legacy mesh
enisdenjo Aug 19, 2025
35b3a17
reminder
enisdenjo Aug 19, 2025
4a79cfd
chore(dependencies): updated changesets for modified dependencies
github-actions[bot] Aug 19, 2025
efad395
dispose and resource management
enisdenjo Aug 19, 2025
7ec34a6
no meshpbusbu
enisdenjo Aug 20, 2025
e0eee41
remove remainders of mesh pubsub
enisdenjo Aug 20, 2025
907e67c
chore(dependencies): updated changesets for modified dependencies
github-actions[bot] Aug 20, 2025
5a374bf
flush before test
enisdenjo Aug 20, 2025
018901b
wait for flush twice
enisdenjo Aug 20, 2025
fbab363
flush as in wait
enisdenjo Aug 20, 2025
53c6964
flush after subscribe then publish
enisdenjo Aug 20, 2025
6b07c75
wait for flush before unsub lol
enisdenjo Aug 21, 2025
41625a0
Revert "wait for flush before unsub lol"
enisdenjo Aug 21, 2025
d8e1f86
Revert "flush after subscribe then publish"
enisdenjo Aug 21, 2025
2e76328
flush after publish before dispose
enisdenjo Aug 21, 2025
c68cab4
bundle pubsub
enisdenjo Aug 21, 2025
ea2f54a
distributed in other envs
enisdenjo Aug 21, 2025
a32537d
no flatmap for v20
enisdenjo Aug 21, 2025
5fc7063
unused import
enisdenjo Aug 21, 2025
d1f6811
no flatmap in redis either
enisdenjo Aug 21, 2025
ad6aabd
bun for e2e
enisdenjo Aug 21, 2025
4ecde9d
fix contexts
enisdenjo Aug 21, 2025
97ceeaf
no foearch
enisdenjo Aug 21, 2025
4c8f4f2
no leaktest for redis pubsub (becaue of container)
enisdenjo Aug 21, 2025
787617a
cancel signal is not available in jest
enisdenjo Aug 21, 2025
cbe9d99
subscribed topic everywhere
enisdenjo Aug 22, 2025
a7d7c59
test
enisdenjo Aug 22, 2025
d05f810
requird channel prefix
enisdenjo Aug 22, 2025
61d516b
no pipe
enisdenjo Aug 22, 2025
b99aacc
changeset
enisdenjo Aug 22, 2025
b32769c
no type issues
enisdenjo Aug 25, 2025
4441298
do not forget
enisdenjo Aug 27, 2025
c377b88
chore(dependencies): updated changesets for modified dependencies
github-actions[bot] Aug 27, 2025
53b0209
dispose is sync
enisdenjo Aug 27, 2025
8c5eebd
not always a promise
enisdenjo Aug 27, 2025
0d99524
unnecessary
enisdenjo Aug 28, 2025
7a94549
spread the promises
enisdenjo Aug 28, 2025
382bde3
edfs
enisdenjo Aug 28, 2025
358ac51
throw if no channel prefix
enisdenjo Aug 28, 2025
14b9f8d
accept undefined too
enisdenjo Aug 28, 2025
25a1617
sub type merg
enisdenjo Aug 29, 2025
b60285a
update snapshot
enisdenjo Aug 29, 2025
6e728ea
ignore some prettier for faster prettier
enisdenjo Aug 29, 2025
a10206d
redis comment correct
enisdenjo Sep 2, 2025
3a704cb
implement nats pubsub
enisdenjo Sep 2, 2025
c52389f
chore(dependencies): updated changesets for modified dependencies
github-actions[bot] Sep 2, 2025
b9dae41
use nats for edfs
enisdenjo Sep 2, 2025
d2071ac
chore(dependencies): updated changesets for modified dependencies
github-actions[bot] Sep 2, 2025
90fc0a7
add nats
enisdenjo Sep 2, 2025
f800e0a
build for nats in docker
enisdenjo Sep 2, 2025
039e9b5
no nats in leaktests
enisdenjo Sep 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/@graphql-hive_pubsub-1395-dependencies.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@graphql-hive/pubsub': patch
---

dependencies updates:

- Added dependency [`@whatwg-node/promise-helpers@^1.3.0` ↗︎](https://www.npmjs.com/package/@whatwg-node/promise-helpers/v/1.3.0) (to `dependencies`)
- Added dependency [`@nats-io/nats-core@^3` ↗︎](https://www.npmjs.com/package/@nats-io/nats-core/v/3.0.0) (to `peerDependencies`)
- Added dependency [`ioredis@^5` ↗︎](https://www.npmjs.com/package/ioredis/v/5.0.0) (to `peerDependencies`)
328 changes: 328 additions & 0 deletions .changeset/hip-suits-dress.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,328 @@
---
'@graphql-hive/pubsub': major
---

Complete API redesign with async support and distributed Redis PubSub

## Redesigned interface

```ts
import type { DisposableSymbols } from '@whatwg-node/disposablestack';
import type { MaybePromise } from '@whatwg-node/promise-helpers';

export type TopicDataMap = { [topic: string]: any /* data */ };

export type PubSubListener<
Data extends TopicDataMap,
Topic extends keyof Data,
> = (data: Data[Topic]) => void;

export interface PubSub<M extends TopicDataMap = TopicDataMap> {
/**
* Publish {@link data} for a {@link topic}.
* @returns `void` or a `Promise` that resolves when the data has been successfully published
*/
publish<Topic extends keyof M>(
topic: Topic,
data: M[Topic],
): MaybePromise<void>;
/**
* A distinct list of all topics that are currently subscribed to.
* Can be a promise to accomodate distributed systems where subscribers exist on other
* locations and we need to know about all of them.
*/
subscribedTopics(): MaybePromise<Iterable<keyof M>>;
/**
* Subscribe and listen to a {@link topic} receiving its data.
*
* If the {@link listener} is provided, it will be called whenever data is emitted for the {@link topic},
*
* @returns an unsubscribe function or a `Promise<unsubscribe function>` that resolves when the subscription is successfully established. the unsubscribe function returns `void` or a `Promise` that resolves on successful unsubscribe and subscription cleanup
*
* If the {@link listener} is not provided,
*
* @returns an `AsyncIterable` that yields data for the given {@link topic}
*/
subscribe<Topic extends keyof M>(topic: Topic): AsyncIterable<M[Topic]>;
subscribe<Topic extends keyof M>(
topic: Topic,
listener: PubSubListener<M, Topic>,
): MaybePromise<() => MaybePromise<void>>;
/**
* Closes active subscriptions and disposes of all resources. Publishing and subscribing after disposal
* is not possible and will throw an error if attempted.
*/
dispose(): MaybePromise<void>;
/** @see {@link dispose} */
[DisposableSymbols.asyncDispose](): Promise<void>;
}
```

## New `NATSPubSub` for a NATS-powered pubsub

```sh
npm i @nats-io/transport-node
```

```ts filename="gateway.config.ts"
import { defineConfig } from '@graphql-hive/gateway';
import { NATSPubSub } from '@graphql-hive/pubsub/nats';
import { connect } from '@nats-io/transport-node';

export const gatewayConfig = defineConfig({
maskedErrors: false,
pubsub: new NATSPubSub(
await connect(),
{
// we make sure to use the same prefix for all gateways to share the same channels and pubsub.
// meaning, all gateways using this channel prefix will receive and publish to the same topics
subjectPrefix: 'my-app',
},
),
});
```

## New `RedisPubSub` for a Redis-powered pubsub

```sh
npm i ioredis
```

```ts
import { RedisPubSub } from '@graphql-hive/pubsub/redis';
import Redis from 'ioredis';

/**
* When a Redis connection enters "subscriber mode" (after calling SUBSCRIBE), it can only execute
* subscriber commands (SUBSCRIBE, UNSUBSCRIBE, etc.). Meaning, it cannot execute other commands like PUBLISH.
* To avoid this, we use two separate Redis clients: one for publishing and one for subscribing.
*/
const pub = new Redis();
const sub = new Redis();

const pubsub = new RedisPubSub(
{ pub, sub },
// if the chanel prefix is the shared between services, the topics will be shared as well
// this means that if you have multiple services using the same channel prefix, they will
// receive each other's messages
{ channelPrefix: 'my-app' }
);
```

## Migrating

The main migration effort involves:
1. Updating import statements
2. Adding `await` to async operations
3. Replacing subscription ID pattern with unsubscribe functions
4. Replacing `asyncIterator()` with overloaded `subscribe()`
5. Choosing between `MemPubSub` and `RedisPubSub` implementations
6. Using the `PubSub` interface instead of `HivePubSub`

Before:

```typescript
import { PubSub, HivePubSub } from '@graphql-hive/pubsub';

interface TopicMap {
userCreated: { id: string; name: string };
orderPlaced: { orderId: string; amount: number };
}

const pubsub: HivePubSub<TopicMap> = new PubSub();

// Subscribe
const subId = pubsub.subscribe('userCreated', (user) => {
console.log('User created:', user.name);
});

// Publish
pubsub.publish('userCreated', { id: '1', name: 'John' });

// Async iteration
(async () => {
for await (const order of pubsub.asyncIterator('orderPlaced')) {
console.log('Order placed:', order.orderId);
}
})();

// Get topics
const topics = pubsub.getEventNames();

// Unsubcribe
pubsub.unsubscribe(subId);

// Dispose/destroy the pubsub
pubsub.dispose();
```

After:

```typescript
import { MemPubSub, PubSub } from '@graphql-hive/pubsub';

interface TopicMap {
userCreated: { id: string; name: string };
orderPlaced: { orderId: string; amount: number };
}

const pubsub: PubSub<TopicMap> = new MemPubSub();

// Subscribe
const unsubscribe = await pubsub.subscribe('userCreated', (user) => {
console.log('User created:', user.name);
});

// Publish
await pubsub.publish('userCreated', { id: '1', name: 'John' });

// Async iteration
(async () => {
for await (const order of pubsub.subscribe('orderPlaced')) {
console.log('Order placed:', order.orderId);
}
})();

// Get topics
const topics = await pubsub.subscribedTopics();

// Unsubscribe
await unsubscribe();

// Dispose/destroy the pubsub
await pubsub.dispose();
```

### Interface renamed from `HivePubSub` to just `PubSub`

```diff
- import { HivePubSub } from '@graphql-hive/pubsub';
+ import { PubSub } from '@graphql-hive/pubsub';
```

### `subscribedTopics()` method signature change

This method is now required and supports async operations.

```diff
- subscribedTopics?(): Iterable; // Optional
+ subscribedTopics(): MaybePromise<Iterable>; // Required, supports async
```

### `publish()` method signature change

Publishing can now be async and may return a promise.

```diff
- publish<Topic extends keyof Data>(topic: Topic, data: Data[Topic]): void;
+ publish<Topic extends keyof M>(topic: Topic, data: M[Topic]): MaybePromise<void>;
```

Migrating existing code:

```diff
- pubsub.publish('topic', data);
+ await pubsub.publish('topic', data);
```

### `subscribe()` method signature change

Subscribe now returns an unsubscribe function instead of a subscription ID.

```diff
subscribe<Topic extends keyof Data>(
topic: Topic,
listener: PubSubListener<Data, Topic>
- ): number; // Returns subscription ID
+ ): MaybePromise<() => MaybePromise<void>>; // Returns unsubscribe function
```

Migrating existing code:

```diff
- const subId = pubsub.subscribe('topic', (data) => {
- console.log(data);
- });
- pubsub.unsubscribe(subId);
+ const unsubscribe = await pubsub.subscribe('topic', (data) => {
+ console.log(data);
+ });
+ await unsubscribe();
```

### `dispose()` method signature change

Disposal is now required and supports async operations.

```diff
- dispose?(): void; // Optional
+ dispose(): MaybePromise<void>; // Required, supports async
```

Migrating existing code:

```diff
- pubsub.dispose();
+ await pubsub.dispose();
```

### Removed `getEventNames()` method

This deprecated method was removed. Use `subscribedTopics()` instead.

```diff
- getEventNames(): Iterable<keyof Data>;
```

Migrating existing code:

```diff
- const topics = pubsub.getEventNames();
+ const topics = await pubsub.subscribedTopics();
```

### Removed `unsubscribe()` method

The centralized unsubscribe method was removed. Each subscription now returns its own unsubscribe function.

```diff
- unsubscribe(subId: number): void;
```

Migrating existing code by using the unsubscribe function returned by `subscribe()` instead:

```diff
- const subId = pubsub.subscribe('topic', listener);
- pubsub.unsubscribe(subId);

+ const unsubscribe = await pubsub.subscribe('topic', listener);
+ await unsubscribe();
```

### Removed `asyncIterator()` Method

The separate async iterator method was removed. Call `subscribe()` without a listener to get an async iterable.

```diff
- asyncIterator<Topic extends keyof Data>(topic: Topic): AsyncIterable<Data[Topic]>;
```

Migrating existing code:

```diff
- for await (const data of pubsub.asyncIterator('topic')) {
+ for await (const data of pubsub.subscribe('topic')) {
console.log(data);
}
```

### `MemPubSub` is the in-memory pubsub implementation

The generic `PubSub` class was replaced with implementation specific `MemPubSub` for an in-memory pubsub.

```diff
- import { PubSub } from '@graphql-hive/pubsub';
- const pubsub = new PubSub();
+ import { MemPubSub } from '@graphql-hive/pubsub';
+ const pubsub = new MemPubSub();
```
5 changes: 5 additions & 0 deletions .changeset/twelve-pets-run.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@graphql-hive/pubsub': patch
---

Export TopicDataMap type for easier external implementations
3 changes: 3 additions & 0 deletions .github/workflows/examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ jobs:
- subscriptions-with-transforms
- type-merging-batching
- operation-field-permissions
- operation-field-permissions
# TODO: support containers before enabling (we use redis image in e2e)
# - distributed-subscriptions-webhooks
name: Convert ${{matrix.e2e}}
runs-on: ubuntu-latest
steps:
Expand Down
4 changes: 4 additions & 0 deletions .prettierignore
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
.yarn/*
yarn.lock
!.yarn/custom-plugins
Dockerfile
packages/runtime/src/landing-page.generated.ts
__generated__
__snapshots__
.changeset/*
!.changeset/README.md
!.changeset/config.json
Expand All @@ -15,3 +17,5 @@ __generated__
/e2e/load-on-init/malformed.graphql
CHANGELOG.md
/internal/heapsnapshot/dist/
*.tar.gz
*.tgz
Loading
Loading