Skip to content

Fix concurrency issues #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ jobs:
matrix:
node-version: [16.x, 18.x, 20.x, '*']

env:
WASI_VERSION: 14
WASI_SDK_PATH: /tmp/wasi-sdk

steps:
- uses: actions/checkout@v3
with:
Expand All @@ -25,9 +21,7 @@ jobs:

- name: Set up WASI-SDK
run: |
wget https://github.com/WebAssembly/wasi-sdk/releases/download/wasi-sdk-${WASI_VERSION}/wasi-sdk-${WASI_VERSION}.0-linux.tar.gz
mkdir -p $WASI_SDK_PATH
tar xvf wasi-sdk-*-linux.tar.gz -C $WASI_SDK_PATH --strip-components=1
npm run setup-wasi-sdk

- uses: actions/setup-node@v3
with:
Expand All @@ -36,7 +30,7 @@ jobs:
- run: npm install

- name: Build & test
run: wasisdkroot=$WASI_SDK_PATH make && npm run test
run: make && npm run test
env:
# Legacy provider required for old webpack version in new Node releases:
NODE_OPTIONS: ${{ matrix.node-version != '16.x' && '--openssl-legacy-provider' || '' }}
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ sample/lib/
sample/data/random*
sample/data/sample.wasm.xz
sample/data/sample.wasm-brotli.br
wasi-sdk/
15 changes: 5 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,10 @@ xzdir := module/xz-embedded
xzlibdir := $(xzdir)/linux/lib/xz

ifeq ($(wasisdkroot),)
$(error wasisdkroot is not set)
wasisdkroot := wasi-sdk
endif

ifeq ($(shell grep -o WSL2 /proc/version ),WSL2)
# Runs a lot faster for me
webpackcommand := cmd.exe /c npm run webpack
else
webpackcommand := npm run webpack
endif
webpackcommand := node_modules/.bin/webpack

.PHONY: all clean sample run-sample package

Expand Down Expand Up @@ -64,6 +59,6 @@ run-sample:
clean:
rm -rf dist
rm -rf sample/lib
rm sample/data/random*
rm sample/data/sample.wasm.xz
rm sample/data/sample.wasm-brotli.br
rm -f sample/data/random*
rm -f sample/data/sample.wasm.xz
rm -f sample/data/sample.wasm-brotli.br
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ XZ-Decompress doesn't have built-in support for `.tar`. However, you can use it
* Clone/update submodules
* `git submodule update --init --recursive`
* Ensure you have a working Clang toolchain that can build wasm
* For example, install https://github.com/WebAssembly/wasi-sdk
* `export wasisdkroot=/path/to/wask-sdk`
* If running on Linux: `npm run setup-wasi-sdk`
* Otherwise you can install https://github.com/WebAssembly/wasi-sdk and `export wasisdkroot=/path/to/wasi-sdk`
* (For testing only) Ensure you have `xz` and `brotli` available as commands on $PATH
* Run `make`

Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
},
"scripts": {
"test": "mocha -r ts-node/register 'test/**/*.spec.ts'",
"setup-wasi-sdk": "wasisdkroot=wasi-sdk && wasi_version=21 && rm -rf \"${wasisdkroot}\" && mkdir -p \"${wasisdkroot}\" && curl -fL -# \"https://github.com/WebAssembly/wasi-sdk/releases/download/wasi-sdk-${wasi_version}/wasi-sdk-${wasi_version}.0-linux.tar.gz\" | tar -xzf - -C \"${wasisdkroot}\" --strip-components 1",
"webpack": "webpack"
},
"repository": {
Expand Down
93 changes: 73 additions & 20 deletions src/xz-decompress.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,40 @@ class XzContext {
}
}

// Simple mutex to serialize context creation and prevent resource exhaustion
class ContextMutex {
constructor() {
this.locked = false;
this.waitQueue = [];
}

async acquire() {
if (!this.locked) {
this.locked = true;
return;
}

// Wait in queue
return new Promise((resolve) => {
this.waitQueue.push(resolve);
});
}

release() {
if (this.waitQueue.length > 0) {
const next = this.waitQueue.shift();
next();
} else {
this.locked = false;
}
}
}

export class XzReadableStream extends ReadableStream {
static _moduleInstancePromise;
static _moduleInstance;
static _contextMutex = new ContextMutex();

static async _getModuleInstance() {
const base64Wasm = xzwasmBytes.replace('data:application/wasm;base64,', '');
const wasmBytes = Uint8Array.from(atob(base64Wasm), c => c.charCodeAt(0)).buffer;
Expand All @@ -81,37 +112,59 @@ export class XzReadableStream extends ReadableStream {

super({
async start(controller) {
if (!XzReadableStream._moduleInstance) {
await (XzReadableStream._moduleInstancePromise || (XzReadableStream._moduleInstancePromise = XzReadableStream._getModuleInstance()));
await XzReadableStream._contextMutex.acquire();

try {
if (!XzReadableStream._moduleInstance) {
await (XzReadableStream._moduleInstancePromise || (XzReadableStream._moduleInstancePromise = XzReadableStream._getModuleInstance()));
}
xzContext = new XzContext(XzReadableStream._moduleInstance);
} catch (error) {
XzReadableStream._contextMutex.release();
throw error;
}
xzContext = new XzContext(XzReadableStream._moduleInstance);
},

async pull(controller) {
if (xzContext.needsMoreInput()) {
if (unconsumedInput === null || unconsumedInput.byteLength === 0) {
const { done, value } = await compressedReader.read();
if (!done) {
unconsumedInput = value;
try {
if (xzContext.needsMoreInput()) {
if (unconsumedInput === null || unconsumedInput.byteLength === 0) {
const { done, value } = await compressedReader.read();
if (!done) {
unconsumedInput = value;
}
}
const nextInputLength = Math.min(xzContext.bufSize, unconsumedInput.byteLength);
xzContext.supplyInput(unconsumedInput.subarray(0, nextInputLength));
unconsumedInput = unconsumedInput.subarray(nextInputLength);
}
const nextInputLength = Math.min(xzContext.bufSize, unconsumedInput.byteLength);
xzContext.supplyInput(unconsumedInput.subarray(0, nextInputLength));
unconsumedInput = unconsumedInput.subarray(nextInputLength);
}

const nextOutputResult = xzContext.getNextOutput();
controller.enqueue(nextOutputResult.outChunk);
xzContext.resetOutputBuffer();
const nextOutputResult = xzContext.getNextOutput();
controller.enqueue(nextOutputResult.outChunk);
xzContext.resetOutputBuffer();

if (nextOutputResult.finished) {
xzContext.dispose(); // Not sure if this always happens
controller.close();
if (nextOutputResult.finished) {
xzContext.dispose();
XzReadableStream._contextMutex.release();
controller.close();
}
} catch (error) {
if (xzContext) {
xzContext.dispose();
}
XzReadableStream._contextMutex.release();
throw error;
}
},
cancel() {
xzContext.dispose(); // Not sure if this always happens
return compressedReader.cancel();
try {
if (xzContext) {
xzContext.dispose();
}
return compressedReader.cancel();
} finally {
XzReadableStream._contextMutex.release();
}
}
});
}
Expand Down
95 changes: 95 additions & 0 deletions test/test.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,99 @@ describe("Streaming XS decompression", () => {
expect(result1).to.equal('hello world\n');
expect(result2).to.equal('hello world\n');
});

it("can decompress many demo test streams in parallel", async () => {
const streams = [];
const promises = [];

for (let i = 0; i < 10_000; i++) {
const dataStream = buildStaticDataStream(Buffer.from(HELLO_WORLD_XZ, 'base64'));
const stream = new XzReadableStream(dataStream);
streams.push(stream);
promises.push(collectOutputString(stream));
}

const results = await Promise.all(promises);

for (const result of results) {
expect(result).to.equal('hello world\n');
}
});

it("handles errors without causing deadlocks", async () => {
// Create a stream that will fail during decompression
const badDataStream = buildStaticDataStream(Buffer.from('invalid-xz-data', 'utf8'));
const stream = new XzReadableStream(badDataStream);

// The stream should fail, but not hang
try {
await collectOutputString(stream);
expect.fail('Expected stream to throw an error');
} catch (error) {
// This is expected - the stream should fail with an error
expect(error).to.be.an('error');
}

// After the error, we should be able to create new streams successfully
// This verifies that the mutex was properly released
const validDataStream = buildStaticDataStream(Buffer.from(HELLO_WORLD_XZ, 'base64'));
const validStream = new XzReadableStream(validDataStream);

const result = await collectOutputString(validStream);
expect(result).to.equal('hello world\n');
});

it("handles stream cancellation without deadlocks", async () => {
const dataStream = buildStaticDataStream(Buffer.from(HELLO_WORLD_XZ, 'base64'));
const stream = new XzReadableStream(dataStream);

// Start reading the stream
const reader = stream.getReader();
const { value } = await reader.read();
expect(value).to.be.instanceOf(Uint8Array);

// Cancel the stream
await reader.cancel();

// After cancellation, we should be able to create new streams successfully
// This verifies that the mutex was properly released during cancellation
const validDataStream = buildStaticDataStream(Buffer.from(HELLO_WORLD_XZ, 'base64'));
const validStream = new XzReadableStream(validDataStream);

const result = await collectOutputString(validStream);
expect(result).to.equal('hello world\n');
});

it("handles multiple errors without permanent deadlocks", async () => {
const promises = [];

// Create multiple streams that will fail
for (let i = 0; i < 5; i++) {
const badDataStream = buildStaticDataStream(Buffer.from(`invalid-xz-data-${i}`, 'utf8'));
const stream = new XzReadableStream(badDataStream);
promises.push(
collectOutputString(stream).catch(error => {
// Expected to fail
expect(error).to.be.an('error');
return null; // Mark as handled
})
);
}

// Wait for all streams to fail
const results = await Promise.all(promises);

// All should have failed (returned null from catch)
for (const result of results) {
expect(result).to.be.null;
}

// After all errors, we should still be able to create valid streams
// This verifies that the mutex isn't permanently locked
const validDataStream = buildStaticDataStream(Buffer.from(HELLO_WORLD_XZ, 'base64'));
const validStream = new XzReadableStream(validDataStream);

const result = await collectOutputString(validStream);
expect(result).to.equal('hello world\n');
});
});