Skip to content

[PHP] Support non-blocking read streams #2339

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
Jul 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
9ad80ba
Support non-blocking read streams
adamziel Jul 5, 2025
ba2342e
Comment out O_DIRECT
adamziel Jul 5, 2025
108ddac
Comment out O_NOATIME
adamziel Jul 5, 2025
a510512
Comment out O_NOATIME
adamziel Jul 5, 2025
ecf1e4a
Redefine O_ macros
adamziel Jul 5, 2025
68525f6
Reformat
adamziel Jul 5, 2025
441efc4
Add a unit test for non-blocking proc_open fd
adamziel Jul 5, 2025
383db9e
Remove special casing for PHPWASM.child_proc_by_fd. Rely on streaming…
adamziel Jul 7, 2025
c5920b2
Non-blocking streams support without refactoring polling
adamziel Jul 8, 2025
212623b
Rebuild Asyncify PHP for Node.js
adamziel Jul 8, 2025
afa7fdf
Rebuild JSPI PHP for Node.js
adamziel Jul 8, 2025
ae16805
Rebuild PHP for the web
adamziel Jul 8, 2025
4339f3b
lint
adamziel Jul 8, 2025
8187077
Merge branch 'trunk' into partial-nonblocking-streams-support
adamziel Jul 8, 2025
35d6272
Harmonizing the ProcessAPI implementation with the new pipes contract
adamziel Jul 9, 2025
84011b0
adamziel Jul 9, 2025
056ce51
Get proc_open() tests to work
adamziel Jul 9, 2025
ef923fd
Inspect proc_open() tests and get them to work
adamziel Jul 9, 2025
eb43b67
Rebuild PHP 7.4 with updated process pipes
adamziel Jul 9, 2025
5b434e8
Clean up phpwasm-emscripten-library.js, remove parts we don't need an…
adamziel Jul 9, 2025
24b49df
Pump() the data periodically in js_open_process
adamziel Jul 9, 2025
1bdf692
Simplify Stdin handling – remove stream_ops overrides
adamziel Jul 9, 2025
c7a218f
Final cleanup
adamziel Jul 9, 2025
f01a360
Rebuild a few PHP versions
adamziel Jul 9, 2025
c86e174
Merge branch 'trunk' into partial-nonblocking-streams-support
adamziel Jul 9, 2025
7e81cd9
Lint, more PHP versions rebuilt
adamziel Jul 9, 2025
b4f5fcd
Lint, more PHP versions rebuilt
adamziel Jul 9, 2025
7ad008c
Lint, more PHP versions rebuilt
adamziel Jul 9, 2025
0d11a91
Lint, more PHP versions rebuilt
adamziel Jul 10, 2025
04a2dff
Rebuild PHP 7.4
adamziel Jul 10, 2025
59970a3
Instrument jspi builds
adamziel Jul 11, 2025
dfc381d
Rebuild Node.js PHP JSPI
adamziel Jul 12, 2025
349fa52
Rebuild web PHP
adamziel Jul 12, 2025
aec292b
Await php_wasm_init
adamziel Jul 13, 2025
98d5ff9
Remove JSPI-specific workaround
adamziel Jul 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion packages/php-wasm/compile/php/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,7 @@ RUN export ASYNCIFY_IMPORTS=$'[\n\
"cli",\
"close_stmt_and_copy_errors",\
"close",\
"__stdio_close",\
"closeUnixFile",\
"compile_file",\
"createCollation",\
Expand Down Expand Up @@ -2060,7 +2061,7 @@ RUN set -euxo pipefail; \
source /root/emsdk/emsdk_env.sh; \
if [ "$WITH_JSPI" = "yes" ]; then \
# Both imports and exports are required for inter-module communication with wrapped methods, e.g., wasm_recv.
export ASYNCIFY_FLAGS=" -s ASYNCIFY=2 -sSUPPORT_LONGJMP=wasm -fwasm-exceptions -sJSPI_IMPORTS=js_open_process,js_waitpid,js_process_status,js_create_input_device,wasm_setsockopt,wasm_shutdown,wasm_close,wasm_recv -sJSPI_EXPORTS=wasm_sleep,wasm_read,emscripten_sleep,wasm_sapi_handle_request,wasm_sapi_request_shutdown,wasm_poll_socket,wrap_select,__wrap_select,select,php_pollfd_for,fflush,wasm_popen,wasm_read,wasm_php_exec,run_cli,wasm_recv -s EXTRA_EXPORTED_RUNTIME_METHODS=ccall,PROXYFS,wasmExports,_malloc "; \
export ASYNCIFY_FLAGS=" -s ASYNCIFY=2 -sSUPPORT_LONGJMP=wasm -fwasm-exceptions -sJSPI_IMPORTS=js_open_process,js_fd_read,js_waitpid,js_process_status,js_create_input_device,wasm_setsockopt,wasm_shutdown,wasm_close,wasm_recv,__syscall_fcntl64,js_flock,js_release_file_locks,js_waitpid,fd_close -sJSPI_EXPORTS=php_wasm_init,fd_close,wasm_sleep,wasm_read,emscripten_sleep,wasm_sapi_handle_request,wasm_sapi_request_shutdown,wasm_poll_socket,wrap_select,__wrap_select,select,php_pollfd_for,fflush,wasm_popen,wasm_read,wasm_php_exec,run_cli,wasm_recv -s EXTRA_EXPORTED_RUNTIME_METHODS=ccall,PROXYFS,wasmExports,_malloc "; \
echo '#define PLAYGROUND_JSPI 1' > /root/php_wasm_asyncify.h; \
else \
export ASYNCIFY_FLAGS=" -s ASYNCIFY=1 -s ASYNCIFY_IGNORE_INDIRECT=1 -s EXPORTED_RUNTIME_METHODS=ccall,PROXYFS,wasmExports $(cat /root/.emcc-php-asyncify-flags) "; \
Expand Down Expand Up @@ -2134,6 +2135,7 @@ RUN set -euxo pipefail; \
-o /build/output/php.js \
-s EXIT_RUNTIME=1 \
-Wl,--wrap=select \
-Wl,--wrap=usleep \
/root/lib/libphp.a \
/root/proc_open.c \
/root/php_wasm.c \
Expand Down Expand Up @@ -2194,6 +2196,9 @@ RUN set -euxo pipefail; \
/root/replace.sh "s/sock\.server\s*=\s*new WebSocketServer/if (Module['websocket']['serverDecorator']) {WebSocketServer = Module['websocket']['serverDecorator'](WebSocketServer);}sock.server = new WebSocketServer/g" /root/output/php.js; \
fi; \
fi; \
# PIPEFS: return 0 (success) instead of 6 (EWOULDBLOCK) when reached end of data
# and the other end of the pipe is closed.
/root/replace-across-lines.sh 's/if\s*\(\s*currentLength\s*==\s*0\s*\)\s*\{.{0,200}throw new FS.ErrnoError\(\s*6\s*\)/if(currentLength==0){if(pipe.refcnt<2){return 0;}throw new FS.ErrnoError(6)/sg' /root/output/php.js; \
# Add MSG_PEEK flag support in recvfrom
#
# Emscripten ignores the flags argument to ___syscall_recvfrom.
Expand Down
22 changes: 0 additions & 22 deletions packages/php-wasm/compile/php/php7.4.patch
Original file line number Diff line number Diff line change
@@ -1,25 +1,3 @@
diff --git a/php-src/main/streams/plain_wrapper.c b/php-src/main/streams/plain_wrapper.c
index 4d10e688b5..ca50261b8a 100644
--- a/php-src/main/streams/plain_wrapper.c
+++ b/php-src/main/streams/plain_wrapper.c
@@ -408,6 +408,15 @@ static ssize_t php_stdiop_read(php_stream *stream, char *buf, size_t count)
}
#endif
ret = read(data->fd, buf, PLAIN_WRAP_BUF_SIZE(count));
+ if(
+ (errno == EAGAIN || errno == EWOULDBLOCK) &&
+ (stream->flags & O_NONBLOCK) == 0
+ ) {
+ /* This is a blocking read in C but not in Emscripten.
+ Let's poll for data and try once again */
+ php_pollfd_for(data->fd, POLLIN, NULL);
+ ret = read(data->fd, buf, PLAIN_WRAP_BUF_SIZE(count));
+ }

if (ret == (size_t)-1 && errno == EINTR) {
/* Read was interrupted, retry once,


diff --git a/php-src/ext/standard/file.c b/php-src/ext/standard/file.c
--- a/php-src/ext/standard/file.c
+++ b/php-src/ext/standard/file.c
Expand Down
173 changes: 115 additions & 58 deletions packages/php-wasm/compile/php/php_wasm.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ unsigned int wasm_sleep(unsigned int time)
return time;
}

/**
* Shims usleep(3) functionallity.
*/
EMSCRIPTEN_KEEPALIVE unsigned int __wrap_usleep(unsigned int time_microseconds)
{
// We don't have access to microsecond granularity in JavaScript so let's
// settle for milliseconds.
int time_milliseconds = time_microseconds / 1000;
emscripten_sleep(time_milliseconds);
return time_microseconds;
}

extern int *wasm_setsockopt(int sockfd, int level, int optname, intptr_t optval, size_t optlen, int dummy);

static int redirect_stream_to_file(FILE *stream, char *file_path);
Expand Down Expand Up @@ -151,11 +163,16 @@ EM_JS(int, wasm_poll_socket, (php_socket_t socketd, int events, int timeout), {
return returnCallback((wakeUp) => {
const polls = [];
/**
* Check for socket-ness first. We don't clean up child_proc_by_fd yet and
* sometimes get duplicate entries. isSocket is more reliable out of the two –
* let's check for it first.
* Special semantics for polling sockets.
*
* @TODO: Remove this code branch entirely and poll sockets using the second if/else
* branch below. The only reason this wasn't done yet is that the original PR
* was focusing on removing child process-related special casing and did not
* want to increase the risk of breaking other code, especially around listening
* network sockets.
*/
if (FS.isSocket(FS.getStream(socketd)?.node.mode)) {
const stream = FS.getStream(socketd);
if (FS.isSocket(stream?.node.mode)) {
// This is, most likely, a websocket. Let's make sure.
const sock = getSocketFromFD(socketd);
if (!sock) {
Expand Down Expand Up @@ -208,14 +225,40 @@ EM_JS(int, wasm_poll_socket, (php_socket_t socketd, int events, int timeout), {
lookingFor.add('POLLERR');
}
}
} else if (socketd in PHPWASM.child_proc_by_fd) {
// This is a child process-related socket.
const procInfo = PHPWASM.child_proc_by_fd[socketd];
if (procInfo.exited) {
wakeUp(0);
return;
}
polls.push(PHPWASM.awaitEvent(procInfo.stdout, 'data'));
} else if (stream?.stream_ops?.poll) {
// Poll the stream for data.
// @TODO: Consider reusing the polling implementation in js_fd_read().
let interrupted = false;
async function poll() {
try {
// Inlined ___syscall_poll with added await support:
while (true) {
var mask = POLLNVAL;
mask = SYSCALLS.DEFAULT_POLLMASK;
if (stream.stream_ops?.poll) {
mask = stream.stream_ops.poll(stream, -1);
}

mask &= events | POLLERR | POLLHUP;
if (mask) {
return mask;
}
if (interrupted) {
return ERRNO_CODES.ETIMEDOUT;
}
await new Promise(resolve => setTimeout(resolve, 10));
}
} catch (e) {
if (typeof FS == 'undefined' || !(e.name === 'ErrnoError')) throw e;
return -e.errno;
}
}
polls.push([
poll(),
() => {
interrupted = true;
}
]);
} else {
setTimeout(function () {
wakeUp(1);
Expand Down Expand Up @@ -271,22 +314,20 @@ EM_JS(int, wasm_poll_socket, (php_socket_t socketd, int events, int timeout), {
* @see https://github.com/WordPress/wordpress-playground/issues/951
* @see https://github.com/emscripten-core/emscripten/issues/13214
*/
EM_JS(__wasi_errno_t, js_fd_read, (__wasi_fd_t fd, const __wasi_iovec_t *iov, size_t iovcnt, __wasi_size_t *pnum), {
#ifdef PLAYGROUND_JSPI
EM_ASYNC_JS(__wasi_errno_t, js_fd_read, (__wasi_fd_t fd, const __wasi_iovec_t *iov, size_t iovcnt, __wasi_size_t *pnum), {
const returnCallback = (resolver) => new Promise(resolver);
#else
EM_JS(__wasi_errno_t, js_fd_read, (__wasi_fd_t fd, const __wasi_iovec_t *iov, size_t iovcnt, __wasi_size_t *pnum), {
const returnCallback = (resolver) => Asyncify.handleSleep(resolver);
#endif
const pollAsync = arguments[4] === undefined ? true : !!arguments[4];
if (Asyncify?.State?.Normal === undefined || Asyncify?.state === Asyncify?.State?.Normal) {
var returnCode;
var stream;
let num = 0;
try
{
stream = SYSCALLS.getStreamFromFD(fd);
const num = doReadv(stream, iov, iovcnt);
HEAPU32[pnum >> 2] = num;
// How many bytes did we read?
HEAPU32[pnum >> 2] = doReadv(stream, iov, iovcnt);
return 0;
}
catch (e)
Expand All @@ -296,26 +337,38 @@ EM_JS(__wasi_errno_t, js_fd_read, (__wasi_fd_t fd, const __wasi_iovec_t *iov, si
{
throw e;
}
// Only return synchronously if this isn't an asynchronous pipe.
// Error code 6 indicates EWOULDBLOCK – this is our signal to wait.
// We also need to distinguish between a process pipe and a file pipe, otherwise
// reading from an empty file would block until the timeout.
if (e.errno !== 6 || !(stream?.fd in PHPWASM.child_proc_by_fd))
{
// On failure, yield 0 bytes read to indicate EOF.
HEAPU32[pnum >> 2] = 0;
return returnCode

// Propagate all errors except ones indicating that the stream is waiting for
// more data.
if (e.errno !== ERRNO_CODES.EWOULDBLOCK && e.errno !== ERRNO_CODES.EAGAIN) {
return e.errno;
}

// If the stream is non-blocking, we can return immediately.
const nonBlocking = stream.flags & PHPWASM.O_NONBLOCK;
if (nonBlocking) {
return e.errno;
}

// Otherwise, fallthrough to polling.
}
}

// At this point we know we have to poll.
// You might wonder why we duplicate the code here instead of always using
// Allow the caller to disable polling.
// @TODO: Check if we should even poll here, or is it up to the caller to decide
// and call poll() on their own.
if (false === pollAsync) {
return ERRNO_CODES.EWOULDBLOCK;
}

// At this point we're certain we need to poll.
//
// You might wonder why we duplicate the code here instead of just reusing
// Asyncify.handleSleep(). The reason is performance. Most of the time,
// the read operation will work synchronously and won't require yielding
// back to JS. In these cases we don't want to pay the Asyncify overhead,
// save the stack, yield back to JS, restore the stack etc.
return returnCallback((wakeUp) => {
return returnCallback(async (wakeUp) => {
var retries = 0;
var interval = 50;
var timeout = 5000;
Expand All @@ -324,7 +377,7 @@ EM_JS(__wasi_errno_t, js_fd_read, (__wasi_fd_t fd, const __wasi_iovec_t *iov, si
// to, say, block the entire PHPUnit test suite without any visible
// feedback.
var maxRetries = timeout / interval;
function poll() {
while(true) {
var returnCode;
var stream;
let num;
Expand All @@ -343,28 +396,30 @@ EM_JS(__wasi_errno_t, js_fd_read, (__wasi_fd_t fd, const __wasi_iovec_t *iov, si
returnCode = e.errno;
}

const success = returnCode === 0;
const failure = (
++retries > maxRetries ||
!(fd in PHPWASM.child_proc_by_fd) ||
PHPWASM.child_proc_by_fd[fd]?.exited ||
FS.isClosed(stream)
);
// read succeeded!
if (returnCode === 0) {
HEAPU32[pnum >> 2] = num;
return wakeUp(0);
}

if (success) {
if (
// Too many retries? That's an error, too!
++retries > maxRetries ||
// Stream closed? That's an error.
!stream || FS.isClosed(stream) ||
// Error different than EWOULDBLOCK – propagate it to the caller.
returnCode !== ERRNO_CODES.EWOULDBLOCK ||
// Broken pipe
('pipe' in stream.node && stream.node.pipe.refcnt < 2)
) {
HEAPU32[pnum >> 2] = num;
wakeUp(0);
} else if (failure) {
// On failure, yield 0 bytes read to indicate EOF.
HEAPU32[pnum >> 2] = 0;
// If the failure is due to a timeout, return 0 to indicate that we
// reached EOF. Otherwise, propagate the error code.
wakeUp(returnCode === 6 ? 0 : returnCode);
} else {
setTimeout(poll, interval);
return wakeUp(returnCode);
}

// It's a blocking stream and we Blocking stream with no data available yet.
// Let's poll up to a timeout.
await new Promise(resolve => setTimeout(resolve, interval));
}
poll();
})
});
extern int __wasi_syscall_ret(__wasi_errno_t code);
Expand Down Expand Up @@ -447,27 +502,30 @@ EMSCRIPTEN_KEEPALIVE FILE *wasm_popen(const char *cmd, const char *mode)
}
else if (*mode == 'w')
{
int current_procopen_call_id = ++procopen_call_id;
char *device_path = js_create_input_device(current_procopen_call_id);
int stdin_childend = current_procopen_call_id;
fp = fopen(device_path, mode);

php_file_descriptor_t stdin_pipe[2];
php_file_descriptor_t stdout_pipe[2];
php_file_descriptor_t stderr_pipe[2];
if (0 != pipe(stdout_pipe) || 0 != pipe(stderr_pipe))
if (0 != pipe(stdout_pipe) || 0 != pipe(stderr_pipe) || 0 != pipe(stdin_pipe))
{
php_error_docref(NULL, E_WARNING, "unable to create pipe %s", strerror(errno));
errno = EINVAL;
return 0;
}

fp = fdopen(stdin_pipe[1], "w"); // or "w", depending on direction
if (!fp) {
php_error_docref(NULL, E_WARNING, "unable to create pipe %s", strerror(errno));
errno = EINVAL;
return 0;
}

int *stdin = safe_emalloc(sizeof(int), 3, 0);
int *stdout = safe_emalloc(sizeof(int), 3, 0);
int *stderr = safe_emalloc(sizeof(int), 3, 0);

stdin[0] = 0;
stdin[1] = stdin_childend;
stdin[2] = (int) NULL;
stdin[1] = stdin_pipe[0];
stdin[2] = stdin_pipe[1];

stdout[0] = 1;
stdout[1] = stdout_pipe[0];
Expand All @@ -483,7 +541,6 @@ EMSCRIPTEN_KEEPALIVE FILE *wasm_popen(const char *cmd, const char *mode)
descv[1] = stdout;
descv[2] = stderr;


// the wasm way {{{
js_open_process(
cmd,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const LibraryForFileLocking = {
F_RDLCK: 0,
F_WRLCK: 1,
F_UNLCK: 2,

lockStateToFcntl: {
shared: 0,
exclusive: 1,
Expand Down Expand Up @@ -155,6 +156,7 @@ const LibraryForFileLocking = {
SYSCALLS.varargs = varargs;

// These constants are replaced by Emscripten during the build process
const emscripten_F_SETFL = Number('{{{cDefs.F_SETFL}}}');
const emscripten_F_GETLK = Number('{{{cDefs.F_GETLK}}}');
const emscripten_F_SETLK = Number('{{{cDefs.F_SETLK}}}');
const emscripten_F_SETLKW = Number('{{{cDefs.F_SETLKW}}}');
Expand Down Expand Up @@ -577,6 +579,26 @@ const LibraryForFileLocking = {
// because it is a known errno for a failed F_SETLKW command.
return -ERRNO_CODES.EDEADLK;
}
case emscripten_F_SETFL: {
/**
* Overrides the core Emscripten implementation to reflect what
* fcntl does in linux kernel. This implementation is still missing
* a bunch of nuance, but, unlike the core Emscripten implementation,
* it overrides the stream flags while preserving non-stream flags.
*
* @see fcntl.c:
* https://github.com/torvalds/linux/blob/a79a588fc1761dc12a3064fc2f648ae66cea3c5a/fs/fcntl.c#L39
*/
const arg = varargs ? syscallGetVarargI() : 0;
const stream = SYSCALLS.getStreamFromFD(fd);

// Update the stream flags
stream.flags =
(arg & PHPWASM.SETFL_MASK) |
(stream.flags & ~PHPWASM.SETFL_MASK);

return 0;
}
default:
return _builtin_fcntl64(fd, cmd, varargs);
}
Expand Down
Loading
Loading