diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e967f10..50d3332 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,10 +9,6 @@ jobs: matrix: node-version: [16.x, 18.x, 20.x, '*'] - env: - WASI_VERSION: 14 - WASI_SDK_PATH: /tmp/wasi-sdk - steps: - uses: actions/checkout@v3 with: @@ -25,9 +21,7 @@ jobs: - name: Set up WASI-SDK run: | - wget https://github.com/WebAssembly/wasi-sdk/releases/download/wasi-sdk-${WASI_VERSION}/wasi-sdk-${WASI_VERSION}.0-linux.tar.gz - mkdir -p $WASI_SDK_PATH - tar xvf wasi-sdk-*-linux.tar.gz -C $WASI_SDK_PATH --strip-components=1 + npm run setup-wasi-sdk - uses: actions/setup-node@v3 with: @@ -36,7 +30,7 @@ jobs: - run: npm install - name: Build & test - run: wasisdkroot=$WASI_SDK_PATH make && npm run test + run: make && npm run test env: # Legacy provider required for old webpack version in new Node releases: NODE_OPTIONS: ${{ matrix.node-version != '16.x' && '--openssl-legacy-provider' || '' }} \ No newline at end of file diff --git a/.gitignore b/.gitignore index d6df0f6..503dc16 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ sample/lib/ sample/data/random* sample/data/sample.wasm.xz sample/data/sample.wasm-brotli.br +wasi-sdk/ diff --git a/Makefile b/Makefile index ccf0928..9f840a3 100644 --- a/Makefile +++ b/Makefile @@ -2,15 +2,10 @@ xzdir := module/xz-embedded xzlibdir := $(xzdir)/linux/lib/xz ifeq ($(wasisdkroot),) - $(error wasisdkroot is not set) + wasisdkroot := wasi-sdk endif -ifeq ($(shell grep -o WSL2 /proc/version ),WSL2) - # Runs a lot faster for me - webpackcommand := cmd.exe /c npm run webpack -else - webpackcommand := npm run webpack -endif +webpackcommand := node_modules/.bin/webpack .PHONY: all clean sample run-sample package @@ -64,6 +59,6 @@ run-sample: clean: rm -rf dist rm -rf sample/lib - rm sample/data/random* - rm sample/data/sample.wasm.xz - rm sample/data/sample.wasm-brotli.br + rm -f sample/data/random* + rm -f sample/data/sample.wasm.xz + rm -f sample/data/sample.wasm-brotli.br diff --git a/README.md b/README.md index e517905..ebc6c6d 100644 --- a/README.md +++ b/README.md @@ -54,8 +54,8 @@ XZ-Decompress doesn't have built-in support for `.tar`. However, you can use it * Clone/update submodules * `git submodule update --init --recursive` * Ensure you have a working Clang toolchain that can build wasm - * For example, install https://github.com/WebAssembly/wasi-sdk - * `export wasisdkroot=/path/to/wask-sdk` + * If running on Linux: `npm run setup-wasi-sdk` + * Otherwise you can install https://github.com/WebAssembly/wasi-sdk and `export wasisdkroot=/path/to/wasi-sdk` * (For testing only) Ensure you have `xz` and `brotli` available as commands on $PATH * Run `make` diff --git a/package.json b/package.json index 1f30f74..029f73a 100644 --- a/package.json +++ b/package.json @@ -9,6 +9,7 @@ }, "scripts": { "test": "mocha -r ts-node/register 'test/**/*.spec.ts'", + "setup-wasi-sdk": "wasisdkroot=wasi-sdk && wasi_version=21 && rm -rf \"${wasisdkroot}\" && mkdir -p \"${wasisdkroot}\" && curl -fL -# \"https://github.com/WebAssembly/wasi-sdk/releases/download/wasi-sdk-${wasi_version}/wasi-sdk-${wasi_version}.0-linux.tar.gz\" | tar -xzf - -C \"${wasisdkroot}\" --strip-components 1", "webpack": "webpack" }, "repository": { diff --git a/src/xz-decompress.js b/src/xz-decompress.js index 80020ec..8b63409 100644 --- a/src/xz-decompress.js +++ b/src/xz-decompress.js @@ -63,9 +63,40 @@ class XzContext { } } +// Simple mutex to serialize context creation and prevent resource exhaustion +class ContextMutex { + constructor() { + this.locked = false; + this.waitQueue = []; + } + + async acquire() { + if (!this.locked) { + this.locked = true; + return; + } + + // Wait in queue + return new Promise((resolve) => { + this.waitQueue.push(resolve); + }); + } + + release() { + if (this.waitQueue.length > 0) { + const next = this.waitQueue.shift(); + next(); + } else { + this.locked = false; + } + } +} + export class XzReadableStream extends ReadableStream { static _moduleInstancePromise; static _moduleInstance; + static _contextMutex = new ContextMutex(); + static async _getModuleInstance() { const base64Wasm = xzwasmBytes.replace('data:application/wasm;base64,', ''); const wasmBytes = Uint8Array.from(atob(base64Wasm), c => c.charCodeAt(0)).buffer; @@ -81,37 +112,59 @@ export class XzReadableStream extends ReadableStream { super({ async start(controller) { - if (!XzReadableStream._moduleInstance) { - await (XzReadableStream._moduleInstancePromise || (XzReadableStream._moduleInstancePromise = XzReadableStream._getModuleInstance())); + await XzReadableStream._contextMutex.acquire(); + + try { + if (!XzReadableStream._moduleInstance) { + await (XzReadableStream._moduleInstancePromise || (XzReadableStream._moduleInstancePromise = XzReadableStream._getModuleInstance())); + } + xzContext = new XzContext(XzReadableStream._moduleInstance); + } catch (error) { + XzReadableStream._contextMutex.release(); + throw error; } - xzContext = new XzContext(XzReadableStream._moduleInstance); }, async pull(controller) { - if (xzContext.needsMoreInput()) { - if (unconsumedInput === null || unconsumedInput.byteLength === 0) { - const { done, value } = await compressedReader.read(); - if (!done) { - unconsumedInput = value; + try { + if (xzContext.needsMoreInput()) { + if (unconsumedInput === null || unconsumedInput.byteLength === 0) { + const { done, value } = await compressedReader.read(); + if (!done) { + unconsumedInput = value; + } } + const nextInputLength = Math.min(xzContext.bufSize, unconsumedInput.byteLength); + xzContext.supplyInput(unconsumedInput.subarray(0, nextInputLength)); + unconsumedInput = unconsumedInput.subarray(nextInputLength); } - const nextInputLength = Math.min(xzContext.bufSize, unconsumedInput.byteLength); - xzContext.supplyInput(unconsumedInput.subarray(0, nextInputLength)); - unconsumedInput = unconsumedInput.subarray(nextInputLength); - } - const nextOutputResult = xzContext.getNextOutput(); - controller.enqueue(nextOutputResult.outChunk); - xzContext.resetOutputBuffer(); + const nextOutputResult = xzContext.getNextOutput(); + controller.enqueue(nextOutputResult.outChunk); + xzContext.resetOutputBuffer(); - if (nextOutputResult.finished) { - xzContext.dispose(); // Not sure if this always happens - controller.close(); + if (nextOutputResult.finished) { + xzContext.dispose(); + XzReadableStream._contextMutex.release(); + controller.close(); + } + } catch (error) { + if (xzContext) { + xzContext.dispose(); + } + XzReadableStream._contextMutex.release(); + throw error; } }, cancel() { - xzContext.dispose(); // Not sure if this always happens - return compressedReader.cancel(); + try { + if (xzContext) { + xzContext.dispose(); + } + return compressedReader.cancel(); + } finally { + XzReadableStream._contextMutex.release(); + } } }); } diff --git a/test/test.spec.ts b/test/test.spec.ts index a256fd6..875203d 100644 --- a/test/test.spec.ts +++ b/test/test.spec.ts @@ -47,4 +47,99 @@ describe("Streaming XS decompression", () => { expect(result1).to.equal('hello world\n'); expect(result2).to.equal('hello world\n'); }); + + it("can decompress many demo test streams in parallel", async () => { + const streams = []; + const promises = []; + + for (let i = 0; i < 10_000; i++) { + const dataStream = buildStaticDataStream(Buffer.from(HELLO_WORLD_XZ, 'base64')); + const stream = new XzReadableStream(dataStream); + streams.push(stream); + promises.push(collectOutputString(stream)); + } + + const results = await Promise.all(promises); + + for (const result of results) { + expect(result).to.equal('hello world\n'); + } + }); + + it("handles errors without causing deadlocks", async () => { + // Create a stream that will fail during decompression + const badDataStream = buildStaticDataStream(Buffer.from('invalid-xz-data', 'utf8')); + const stream = new XzReadableStream(badDataStream); + + // The stream should fail, but not hang + try { + await collectOutputString(stream); + expect.fail('Expected stream to throw an error'); + } catch (error) { + // This is expected - the stream should fail with an error + expect(error).to.be.an('error'); + } + + // After the error, we should be able to create new streams successfully + // This verifies that the mutex was properly released + const validDataStream = buildStaticDataStream(Buffer.from(HELLO_WORLD_XZ, 'base64')); + const validStream = new XzReadableStream(validDataStream); + + const result = await collectOutputString(validStream); + expect(result).to.equal('hello world\n'); + }); + + it("handles stream cancellation without deadlocks", async () => { + const dataStream = buildStaticDataStream(Buffer.from(HELLO_WORLD_XZ, 'base64')); + const stream = new XzReadableStream(dataStream); + + // Start reading the stream + const reader = stream.getReader(); + const { value } = await reader.read(); + expect(value).to.be.instanceOf(Uint8Array); + + // Cancel the stream + await reader.cancel(); + + // After cancellation, we should be able to create new streams successfully + // This verifies that the mutex was properly released during cancellation + const validDataStream = buildStaticDataStream(Buffer.from(HELLO_WORLD_XZ, 'base64')); + const validStream = new XzReadableStream(validDataStream); + + const result = await collectOutputString(validStream); + expect(result).to.equal('hello world\n'); + }); + + it("handles multiple errors without permanent deadlocks", async () => { + const promises = []; + + // Create multiple streams that will fail + for (let i = 0; i < 5; i++) { + const badDataStream = buildStaticDataStream(Buffer.from(`invalid-xz-data-${i}`, 'utf8')); + const stream = new XzReadableStream(badDataStream); + promises.push( + collectOutputString(stream).catch(error => { + // Expected to fail + expect(error).to.be.an('error'); + return null; // Mark as handled + }) + ); + } + + // Wait for all streams to fail + const results = await Promise.all(promises); + + // All should have failed (returned null from catch) + for (const result of results) { + expect(result).to.be.null; + } + + // After all errors, we should still be able to create valid streams + // This verifies that the mutex isn't permanently locked + const validDataStream = buildStaticDataStream(Buffer.from(HELLO_WORLD_XZ, 'base64')); + const validStream = new XzReadableStream(validDataStream); + + const result = await collectOutputString(validStream); + expect(result).to.equal('hello world\n'); + }); }); \ No newline at end of file