Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/php-wasm/compile/php/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ RUN export ASYNCIFY_IMPORTS=$'[\n\
"cli",\
"close_stmt_and_copy_errors",\
"close",\
"__stdio_close",\
"closeUnixFile",\
"compile_file",\
"createCollation",\
Expand Down
133 changes: 89 additions & 44 deletions packages/php-wasm/compile/php/php_wasm.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,14 @@ EM_JS(int, wasm_poll_socket, (php_socket_t socketd, int events, int timeout), {
* Check for socket-ness first. We don't clean up child_proc_by_fd yet and
* sometimes get duplicate entries. isSocket is more reliable out of the two –
* let's check for it first.
*
* @TODO: Remove this code branch entirely and poll sockets using the same poll()
* syscall as we use for other streams. The only reason this wasn't done is
* that the original PR was focusing on removing child process-related special
* casing and did not want to increase the risk of breaking other code.
*/
if (FS.isSocket(FS.getStream(socketd)?.node.mode)) {
const stream = FS.getStream(socketd);
if (FS.isSocket(stream?.node.mode)) {
// This is, most likely, a websocket. Let's make sure.
const sock = getSocketFromFD(socketd);
if (!sock) {
Expand Down Expand Up @@ -208,15 +214,44 @@ EM_JS(int, wasm_poll_socket, (php_socket_t socketd, int events, int timeout), {
lookingFor.add('POLLERR');
}
}
} else if (socketd in PHPWASM.child_proc_by_fd) {
// This is a child process-related socket.
const procInfo = PHPWASM.child_proc_by_fd[socketd];
if (procInfo.exited) {
wakeUp(0);
return;
}
polls.push(PHPWASM.awaitEvent(procInfo.stdout, 'data'));
} else {
} else if (stream?.stream_ops?.poll) {
if (!stream) {
wakeUp(-1);
return;
}
// Poll the stream for data.
let interrupted = false;
async function poll() {
try {
while (true) {
// Inlined ___syscall_poll
var mask = 32; // {{{ cDefine('POLLNVAL') }}};
mask = SYSCALLS.DEFAULT_POLLMASK;
if (stream.stream_ops?.poll) {
mask = stream.stream_ops.poll(stream, -1);
}

mask &= events | 8 | 16; // | {{{ cDefine('POLLERR') }}} | {{{ cDefine('POLLHUP') }}};
if (mask) {
return mask;
}
if (interrupted) {
return 0;
}
await new Promise(resolve => setTimeout(resolve, 10));
}
} catch (e) {
if (typeof FS == 'undefined' || !(e.name === 'ErrnoError')) throw e;
return -e.errno;
}
}
polls.push([
poll(),
() => {
interrupted = true;
}
]);
} else {
setTimeout(function () {
wakeUp(1);
}, timeout);
Expand Down Expand Up @@ -279,14 +314,12 @@ EM_JS(__wasi_errno_t, js_fd_read, (__wasi_fd_t fd, const __wasi_iovec_t *iov, si
const returnCallback = (resolver) => Asyncify.handleSleep(resolver);
#endif
if (Asyncify?.State?.Normal === undefined || Asyncify?.state === Asyncify?.State?.Normal) {
var returnCode;
var stream;
let num = 0;
try
{
stream = SYSCALLS.getStreamFromFD(fd);
const num = doReadv(stream, iov, iovcnt);
HEAPU32[pnum >> 2] = num;
// How many bytes did we read?
HEAPU32[pnum >> 2] = doReadv(stream, iov, iovcnt);
return 0;
}
catch (e)
Expand All @@ -296,26 +329,36 @@ EM_JS(__wasi_errno_t, js_fd_read, (__wasi_fd_t fd, const __wasi_iovec_t *iov, si
{
throw e;
}
// Only return synchronously if this isn't an asynchronous pipe.
// Error code 6 indicates EWOULDBLOCK – this is our signal to wait.
// We also need to distinguish between a process pipe and a file pipe, otherwise
// reading from an empty file would block until the timeout.
if (e.errno !== 6 || !(stream?.fd in PHPWASM.child_proc_by_fd))
{
// On failure, yield 0 bytes read to indicate EOF.
const isBlockingFdThatWaitsForData = (
// is blocking?
!(stream.flags & locking.O_NONBLOCK) &&
// is waiting for data?
e.errno === ERRNO_CODES.EWOULDBLOCK &&
// if it's a pipe, does it have a living other end?
(!('pipe' in stream.node) || stream.node.pipe.refcnt >= 2)
);
/**
* The only reason to fall through to polling is if we're processing
* a blocking pipe that's still waiting for data.
*
* In every other case, we should tell the caller we've ran into an error.
*/
if(!isBlockingFdThatWaitsForData) {
// Indicate 0 bytes read.
HEAPU32[pnum >> 2] = 0;
return returnCode
return e.errno;
}
}
}

// At this point we know we have to poll.
// You might wonder why we duplicate the code here instead of always using
// At this point we're certain we need to poll.
//
// You might wonder why we duplicate the code here instead of just reusing
// Asyncify.handleSleep(). The reason is performance. Most of the time,
// the read operation will work synchronously and won't require yielding
// back to JS. In these cases we don't want to pay the Asyncify overhead,
// save the stack, yield back to JS, restore the stack etc.
return returnCallback((wakeUp) => {
return returnCallback(async (wakeUp) => {
var retries = 0;
var interval = 50;
var timeout = 5000;
Expand All @@ -324,7 +367,7 @@ EM_JS(__wasi_errno_t, js_fd_read, (__wasi_fd_t fd, const __wasi_iovec_t *iov, si
// to, say, block the entire PHPUnit test suite without any visible
// feedback.
var maxRetries = timeout / interval;
function poll() {
while(true) {
var returnCode;
var stream;
let num;
Expand All @@ -343,28 +386,30 @@ EM_JS(__wasi_errno_t, js_fd_read, (__wasi_fd_t fd, const __wasi_iovec_t *iov, si
returnCode = e.errno;
}

const success = returnCode === 0;
const failure = (
++retries > maxRetries ||
!(fd in PHPWASM.child_proc_by_fd) ||
PHPWASM.child_proc_by_fd[fd]?.exited ||
FS.isClosed(stream)
);
// read succeeded!
if (returnCode === 0) {
HEAPU32[pnum >> 2] = num;
return wakeUp(0);
}

if (success) {
if (
// Too many retries? That's an error, too!
++retries > maxRetries ||
// Stream closed? That's an error.
!stream || FS.isClosed(stream) ||
// Error different than EWOULDBLOCK – propagate it to the caller.
returnCode !== ERRNO_CODES.EWOULDBLOCK ||
// Broken pipe
('pipe' in stream.node && stream.node.pipe.refcnt < 2)
) {
HEAPU32[pnum >> 2] = num;
wakeUp(0);
} else if (failure) {
// On failure, yield 0 bytes read to indicate EOF.
HEAPU32[pnum >> 2] = 0;
// If the failure is due to a timeout, return 0 to indicate that we
// reached EOF. Otherwise, propagate the error code.
wakeUp(returnCode === 6 ? 0 : returnCode);
} else {
setTimeout(poll, interval);
return wakeUp(returnCode);
}

// It's a blocking stream and we Blocking stream with no data available yet.
// Let's poll up to a timeout.
await new Promise(resolve => setTimeout(resolve, interval));
}
poll();
})
});
extern int __wasi_syscall_ret(__wasi_errno_t code);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ const LibraryForFileLocking = {
F_RDLCK: 0,
F_WRLCK: 1,
F_UNLCK: 2,
/**
* @see fcntl.c:
* https://github.com/torvalds/linux/blob/a79a588fc1761dc12a3064fc2f648ae66cea3c5a/fs/fcntl.c#L37
*/
O_APPEND: Number('{{{cDefs.O_APPEND}}}'),
O_NONBLOCK: Number('{{{cDefs.O_NONBLOCK}}}'),
SETFL_MASK:
Number('{{{cDefs.O_APPEND}}}') |
Number('{{{cDefs.O_NONBLOCK}}}')
// These macros are not defined in Emscripten at the time of writing:
// emscripten_O_NDELAY |
// emscripten_O_DIRECT |
// emscripten_O_NOATIME
,
lockStateToFcntl: {
shared: 0,
exclusive: 1,
Expand All @@ -40,7 +54,10 @@ const LibraryForFileLocking = {
}

// Handle PROXYFS nodes which wrap other nodes.
if (!node?.mount?.opts?.fs?.lookupPath || !node?.mount?.type?.realPath) {
if (
!node?.mount?.opts?.fs?.lookupPath ||
!node?.mount?.type?.realPath
) {
return false;
}

Expand All @@ -50,7 +67,8 @@ const LibraryForFileLocking = {
}
const vfsPath = node.mount.type.realPath(node);
try {
const underlyingNode = node.mount.opts.fs.lookupPath(vfsPath)?.node;
const underlyingNode =
node.mount.opts.fs.lookupPath(vfsPath)?.node;
return !!underlyingNode?.isSharedFS;
} catch (e) {
return false;
Expand Down Expand Up @@ -116,11 +134,14 @@ const LibraryForFileLocking = {
SYSCALLS.varargs = varargs;

// These constants are replaced by Emscripten during the build process
const emscripten_F_SETFL = Number('{{{cDefs.F_SETFL}}}');
const emscripten_F_GETLK = Number('{{{cDefs.F_GETLK}}}');
const emscripten_F_SETLK = Number('{{{cDefs.F_SETLK}}}');
const emscripten_F_SETLKW = Number('{{{cDefs.F_SETLKW}}}');
const emscripten_SEEK_SET = Number('{{{cDefs.SEEK_SET}}}');



// NOTE: With the exception of l_type, these offsets are not exposed to
// JS by Emscripten, so we hardcode them here.
const emscripten_flock_l_type_offset = 0;
Expand Down Expand Up @@ -511,6 +532,38 @@ const LibraryForFileLocking = {
// because it is a known errno for a failed F_SETLKW command.
return -ERRNO_CODES.EDEADLK;
}
case emscripten_F_SETFL: {
/**
* Overrides the core Emscripten implementation to reflect what
* fcntl does in linux kernel. This implementation is still missing
* a bunch of nuance, but, unlike the core Emscripten implementation,
* it overrides the stream flags while preserving non-stream flags.
*
* @see fcntl.c:
* https://github.com/torvalds/linux/blob/a79a588fc1761dc12a3064fc2f648ae66cea3c5a/fs/fcntl.c#L39
*/
const arg = SYSCALLS.get();
const stream = SYSCALLS.getStreamFromFD(fd);

// Get current flags
const currentFlags = stream.flags;

// Required for strict SunOS emulation
if (emscripten_O_NONBLOCK !== emscripten_O_NDELAY) {
if (arg & emscripten_O_NDELAY) {
arg |= emscripten_O_NONBLOCK;
}
}

// Pipe packetized mode is controlled by O_DIRECT flag
// We don't have S_ISFIFO or FMODE_CAN_ODIRECT checks in our implementation
// so we skip this validation

// Update the stream flags
stream.flags = (arg & locking.SETFL_MASK) | (currentFlags & ~locking.SETFL_MASK);

return 0;
}
default:
return _builtin_fcntl64(fd, cmd, varargs);
}
Expand Down
Loading
Loading