Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/cruel-suits-move.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@vercel/edge-config': minor
---

[experimental] stream updates during development
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ npm-debug.log
.turbo
.DS_Store
.vscode

packages/edge-config/DEVELOPMENT.md
3 changes: 2 additions & 1 deletion packages/edge-config/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
"testEnvironment": "node"
},
"dependencies": {
"@vercel/edge-config-fs": "workspace:*"
"@vercel/edge-config-fs": "workspace:*",
"eventsource-client": "1.2.0"
},
"devDependencies": {
"@changesets/cli": "2.28.1",
Expand Down
3 changes: 3 additions & 0 deletions packages/edge-config/src/index.common.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -436,13 +436,16 @@ describe('connectionStrings', () => {

describe('in-memory cache with swr behaviour', () => {
const originalEnv = process.env.NODE_ENV;
const originalDisableStream = process.env.EDGE_CONFIG_DISABLE_STREAM;

beforeAll(() => {
process.env.NODE_ENV = 'development';
process.env.EDGE_CONFIG_DISABLE_STREAM = '1';
});

afterAll(() => {
process.env.NODE_ENV = originalEnv;
process.env.EDGE_CONFIG_DISABLE_STREAM = originalDisableStream;
});

it('use in-memory cache', async () => {
Expand Down
170 changes: 170 additions & 0 deletions packages/edge-config/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { readFile } from '@vercel/edge-config-fs';
import {
createEventSource,
type EventSourceClient,
type FetchLike,
} from 'eventsource-client';
import { name as sdkName, version as sdkVersion } from '../package.json';
import {
assertIsKey,
Expand Down Expand Up @@ -246,6 +251,105 @@ async function consumeResponseBody(res: Response): Promise<void> {
await res.arrayBuffer();
}

class StreamManager {
private stream: EventSourceClient;
private connection: Connection;
/**
* Callback with either the Edge Config or null when the Edge Config
* does not exist or the token is invalid.
*/
private onEdgeConfig: (edgeConfig: EmbeddedEdgeConfig | null) => void;
private resolveStreamUsable?: (value: boolean) => void;

private primedPromise: Promise<boolean> = new Promise<boolean>((resolve) => {
this.resolveStreamUsable = resolve;
});

constructor(
connection: Connection,
onEdgeConfig: (edgeConfig: EmbeddedEdgeConfig | null) => void,
) {
this.connection = connection;
this.onEdgeConfig = onEdgeConfig;

// TODO we can remove the custom fetch once eventstream-client supports
// seeing the status code. We only need this to be able to stop retrying
// on 401, 403, 404.
const fetchKeepResponse = (): FetchLike & {
status?: number;
statusText?: string;
} => {
const f: FetchLike & { status?: number; statusText?: string } = async (
url,
fetchInit,
) => {
f.status = undefined;
f.statusText = undefined;
const response = await fetch(url, fetchInit);
f.status = response.status;
f.statusText = response.statusText;
return response;
};
return f;
};

const customFetch = fetchKeepResponse();

this.stream = createEventSource({
url: `https://api.vercel.com/v1/edge-config/${this.connection.id}/stream`,
headers: { Authorization: `Bearer ${this.connection.token}` },
fetch: customFetch,
onDisconnect: () => {
if (!customFetch.status || customFetch.status >= 400) {
this.resolveStreamUsable?.(false);
this.stream.close();
}
},
});
}

async listen(): Promise<void> {
for await (const { data, event } of this.stream) {
if (event === 'status' && data === 'token_invalidated') {
this.stream.close();
return;
}

if (event === 'status' && data === 'primed') {
this.resolveStreamUsable?.(true);
continue;
}

if (event === 'embed') {
try {
const parsedEdgeConfig = JSON.parse(data) as EmbeddedEdgeConfig;
this.onEdgeConfig(parsedEdgeConfig);
} catch (e) {
// eslint-disable-next-line no-console -- intentional error logging
console.error(
'@vercel/edge-config: Error parsing streamed edge config',
e,
);
}
}
}

this.stream.close();
}

primed(): Promise<boolean> {
return this.primedPromise;
}

readyState(): 'open' | 'connecting' | 'closed' {
return this.stream.readyState;
}

close(): void {
this.stream.close();
}
}

interface EdgeConfigClientOptions {
/**
* The stale-if-error response directive indicates that the cache can reuse a
Expand All @@ -270,6 +374,11 @@ interface EdgeConfigClientOptions {
*/
disableDevelopmentCache?: boolean;

/**
* Disables the streaming of the Edge Config.
*/
disableStream?: boolean;

/**
* Sets a `cache` option on the `fetch` call made by Edge Config.
*
Expand Down Expand Up @@ -334,19 +443,48 @@ export const createClient = trace(
process.env.NODE_ENV === 'development' &&
process.env.EDGE_CONFIG_DISABLE_DEVELOPMENT_SWR !== '1';

const shouldUseStream =
!options.disableStream &&
process.env.NODE_ENV === 'development' &&
process.env.EDGE_CONFIG_DISABLE_STREAM !== '1';

const getInMemoryEdgeConfig = createGetInMemoryEdgeConfig(
shouldUseDevelopmentCache,
connection,
headers,
fetchCache,
);

let streamManager: StreamManager | null = null;

let streamedEdgeConfig: EmbeddedEdgeConfig | null = null;
if (shouldUseStream) {
streamManager = new StreamManager(connection, (edgeConfig) => {
streamedEdgeConfig = edgeConfig;
});

void streamManager.listen().catch(() => {
// reset streamedEdgeConfig so it does not get used when there was an
// unexpected error with the stream
streamedEdgeConfig = null;
});
}

const api: Omit<EdgeConfigClient, 'connection'> = {
get: trace(
async function get<T = EdgeConfigValue>(
key: string,
localOptions?: EdgeConfigFunctionsOptions,
): Promise<T | undefined> {
if (
streamManager &&
streamedEdgeConfig &&
streamManager.readyState() !== 'closed' &&
(await streamManager.primed())
) {
return streamedEdgeConfig.items[key] as T;
}

const localEdgeConfig =
(await getInMemoryEdgeConfig(localOptions)) ||
(await getLocalEdgeConfig(connection, localOptions));
Expand Down Expand Up @@ -397,6 +535,15 @@ export const createClient = trace(
key,
localOptions?: EdgeConfigFunctionsOptions,
): Promise<boolean> {
if (
streamManager &&
streamedEdgeConfig &&
streamManager.readyState() !== 'closed' &&
(await streamManager.primed())
) {
return hasOwnProperty(streamedEdgeConfig.items, key);
}

const localEdgeConfig =
(await getInMemoryEdgeConfig(localOptions)) ||
(await getLocalEdgeConfig(connection, localOptions));
Expand Down Expand Up @@ -438,6 +585,20 @@ export const createClient = trace(
keys?: (keyof T)[],
localOptions?: EdgeConfigFunctionsOptions,
): Promise<T> {
if (
streamManager &&
streamedEdgeConfig &&
streamManager.readyState() !== 'closed' &&
(await streamManager.primed())
) {
if (keys === undefined) {
return streamedEdgeConfig.items as T;
}

assertIsKeys(keys);
return pick(streamedEdgeConfig.items, keys) as T;
}

const localEdgeConfig =
(await getInMemoryEdgeConfig(localOptions)) ||
(await getLocalEdgeConfig(connection, localOptions));
Expand Down Expand Up @@ -497,6 +658,15 @@ export const createClient = trace(
async function digest(
localOptions?: EdgeConfigFunctionsOptions,
): Promise<string> {
if (
streamManager &&
streamedEdgeConfig &&
streamManager.readyState() !== 'closed' &&
(await streamManager.primed())
) {
return streamedEdgeConfig.digest;
}

const localEdgeConfig =
(await getInMemoryEdgeConfig(localOptions)) ||
(await getLocalEdgeConfig(connection, localOptions));
Expand Down
Loading