Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ redis.zrange("sortedSet", 0, 2, "WITHSCORES").then((elements) => {
// The format is: redis[SOME_REDIS_COMMAND_IN_LOWERCASE](ARGUMENTS_ARE_JOINED_INTO_COMMAND_STRING)
// so the following statement is equivalent to the CLI: `redis> SET mykey hello EX 10`
redis.set("mykey", "hello", "EX", 10);

// Read the value of a redis key in chunks using nodejs streams
// Retuns a nodejs readable stream
const readStream = redis.getStream("mykey", { chunkSize: 100 * 1000 /* In Bytes */ });



```

See the `examples/` folder for more examples. For example:
Expand Down
36 changes: 36 additions & 0 deletions examples/get-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
const http = require('node:http');
const ioredis = require('ioredis');

const client = new ioredis();

async function streamFromRedis(key, response) {
const dataStream = client.getStream(key, {
chunkSize: 100 * 10,
pipeline: false,
});

for await (const data of dataStream) {
response.write(data);
}

response.end();

}

async function sendFromRedis(key, response) {
const reply = await client.get(key);
response.end(reply);
}

const server = http.createServer();

server.on('request', (request, response) => {
if (request.url === '/stream') {
streamFromRedis('test', response).catch(console.error);
} else {
sendFromRedis('test', response).catch(console.error);
}
});

server.listen(3000);

8 changes: 7 additions & 1 deletion lib/utils/Commander.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Readable } from "node:stream";
import { list } from "@ioredis/commands";
import {
executeWithAutoPipelining,
Expand All @@ -6,7 +7,8 @@ import {
import Command, { ArgumentType } from "../Command";
import Script from "../Script";
import { Callback, WriteableStream } from "../types";
import RedisCommander, { ClientContext } from "./RedisCommander";
import RedisCommander, { ClientContext, GetStreamOptions, RedisKey } from "./RedisCommander";
import { createGetStream } from "./nodeStreams";

export interface CommanderOptions {
keyPrefix?: string;
Expand Down Expand Up @@ -115,6 +117,10 @@ Commander.prototype.callBuffer = generateFunction("callBuffer", null);
// @ts-expect-error
Commander.prototype.send_command = Commander.prototype.call;

Commander.prototype.getStream = function getStream(key: RedisKey, opts: GetStreamOptions = {}) {
return Readable.from(createGetStream(this, key, opts));
}

function generateFunction(functionName: string | null, _encoding: string);
function generateFunction(
functionName: string | null,
Expand Down
6 changes: 6 additions & 0 deletions lib/utils/RedisCommander.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
* this file.
*/

import { Readable } from "stream";
import { Callback } from "../types";

export type RedisKey = string | Buffer;
export type RedisValue = string | Buffer | number;
export type GetStreamOptions = { chunkSize?: number, pipeline?: boolean }

// Inspired by https://github.com/mmkal/handy-redis/blob/main/src/generated/interface.ts.
// Should be fixed with https://github.com/Microsoft/TypeScript/issues/1213
Expand Down Expand Up @@ -3666,6 +3668,10 @@ interface RedisCommander<Context extends ClientContext = { type: "default" }> {
key: RedisKey,
callback?: Callback<string | null>
): Result<string | null, Context>;
getStream(
key: RedisKey,
opts: GetStreamOptions
): Readable;
getBuffer(
key: RedisKey,
callback?: Callback<Buffer | null>
Expand Down
25 changes: 25 additions & 0 deletions lib/utils/nodeStreams.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { executeWithAutoPipelining } from "../autoPipelining";
import Commander from "./Commander";
import { GetStreamOptions, RedisKey } from "./RedisCommander";

export async function* createGetStream(client: Commander, key: RedisKey, opts: GetStreamOptions = {}) {
const size = opts.chunkSize || 10 * 1000;
let cursor = 0;
let isReadable = true;
const isPipelineMode = opts.pipeline !== false;

while (isReadable) {
let chunk;
if (isPipelineMode) {
chunk = await executeWithAutoPipelining(client, 'getrange', 'range', [key, cursor, cursor + size - 1], null)
} else {
chunk = await client.getrange(key, cursor, cursor + size - 1);
}
if (!chunk || typeof chunk !== 'string' || chunk?.length === 0) {
isReadable = false;
} else {
cursor += chunk.length;
yield chunk;
}
}
};