diff options
Diffstat (limited to 'toolkit/modules/subprocess/subprocess_worker_unix.js')
-rw-r--r-- | toolkit/modules/subprocess/subprocess_worker_unix.js | 609 |
1 files changed, 609 insertions, 0 deletions
diff --git a/toolkit/modules/subprocess/subprocess_worker_unix.js b/toolkit/modules/subprocess/subprocess_worker_unix.js new file mode 100644 index 0000000000..839402deb1 --- /dev/null +++ b/toolkit/modules/subprocess/subprocess_worker_unix.js @@ -0,0 +1,609 @@ +/* -*- Mode: indent-tabs-mode: nil; js-indent-level: 2 -*- */ +/* vim: set sts=2 sw=2 et tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +"use strict"; + +/* exported Process */ +/* globals BaseProcess, BasePipe */ + +importScripts("resource://gre/modules/subprocess/subprocess_shared.js", + "resource://gre/modules/subprocess/subprocess_shared_unix.js", + "resource://gre/modules/subprocess/subprocess_worker_common.js"); + +const POLL_TIMEOUT = 5000; + +let io; + +let nextPipeId = 0; + +class Pipe extends BasePipe { + constructor(process, fd) { + super(); + + this.process = process; + this.fd = fd; + this.id = nextPipeId++; + } + + get pollEvents() { + throw new Error("Not implemented"); + } + + /** + * Closes the file descriptor. + * + * @param {boolean} [force=false] + * If true, the file descriptor is closed immediately. If false, the + * file descriptor is closed after all current pending IO operations + * have completed. + * + * @returns {Promise<void>} + * Resolves when the file descriptor has been closed. + */ + close(force = false) { + if (!force && this.pending.length) { + this.closing = true; + return this.closedPromise; + } + + for (let {reject} of this.pending) { + let error = new Error("File closed"); + error.errorCode = SubprocessConstants.ERROR_END_OF_FILE; + reject(error); + } + this.pending.length = 0; + + if (!this.closed) { + this.fd.dispose(); + + this.closed = true; + this.resolveClosed(); + + io.pipes.delete(this.id); + io.updatePollFds(); + } + return this.closedPromise; + } + + /** + * Called when an error occurred while polling our file descriptor. + */ + onError() { + this.close(true); + this.process.wait(); + } +} + +class InputPipe extends Pipe { + /** + * A bit mask of poll() events which we currently wish to be notified of on + * this file descriptor. + */ + get pollEvents() { + if (this.pending.length) { + return LIBC.POLLIN; + } + return 0; + } + + /** + * Asynchronously reads at most `length` bytes of binary data from the file + * descriptor into an ArrayBuffer of the same size. Returns a promise which + * resolves when the operation is complete. + * + * @param {integer} length + * The number of bytes to read. + * + * @returns {Promise<ArrayBuffer>} + */ + read(length) { + if (this.closing || this.closed) { + throw new Error("Attempt to read from closed pipe"); + } + + return new Promise((resolve, reject) => { + this.pending.push({resolve, reject, length}); + io.updatePollFds(); + }); + } + + /** + * Synchronously reads at most `count` bytes of binary data into an + * ArrayBuffer, and returns that buffer. If no data can be read without + * blocking, returns null instead. + * + * @param {integer} count + * The number of bytes to read. + * + * @returns {ArrayBuffer|null} + */ + readBuffer(count) { + let buffer = new ArrayBuffer(count); + + let read = +libc.read(this.fd, buffer, buffer.byteLength); + if (read < 0 && ctypes.errno != LIBC.EAGAIN) { + this.onError(); + } + + if (read <= 0) { + return null; + } + + if (read < buffer.byteLength) { + return ArrayBuffer.transfer(buffer, read); + } + + return buffer; + } + + /** + * Called when one of the IO operations matching the `pollEvents` mask may be + * performed without blocking. + * + * @returns {boolean} + * True if any data was successfully read. + */ + onReady() { + let result = false; + let reads = this.pending; + while (reads.length) { + let {resolve, length} = reads[0]; + + let buffer = this.readBuffer(length); + if (buffer) { + result = true; + this.shiftPending(); + resolve(buffer); + } else { + break; + } + } + + if (reads.length == 0) { + io.updatePollFds(); + } + return result; + } +} + +class OutputPipe extends Pipe { + /** + * A bit mask of poll() events which we currently wish to be notified of on + * this file discriptor. + */ + get pollEvents() { + if (this.pending.length) { + return LIBC.POLLOUT; + } + return 0; + } + + /** + * Asynchronously writes the given buffer to our file descriptor, and returns + * a promise which resolves when the operation is complete. + * + * @param {ArrayBuffer} buffer + * The buffer to write. + * + * @returns {Promise<integer>} + * Resolves to the number of bytes written when the operation is + * complete. + */ + write(buffer) { + if (this.closing || this.closed) { + throw new Error("Attempt to write to closed pipe"); + } + + return new Promise((resolve, reject) => { + this.pending.push({resolve, reject, buffer, length: buffer.byteLength}); + io.updatePollFds(); + }); + } + + /** + * Attempts to synchronously write the given buffer to our file descriptor. + * Writes only as many bytes as can be written without blocking, and returns + * the number of byes successfully written. + * + * Closes the file descriptor if an IO error occurs. + * + * @param {ArrayBuffer} buffer + * The buffer to write. + * + * @returns {integer} + * The number of bytes successfully written. + */ + writeBuffer(buffer) { + let bytesWritten = libc.write(this.fd, buffer, buffer.byteLength); + + if (bytesWritten < 0 && ctypes.errno != LIBC.EAGAIN) { + this.onError(); + } + + return bytesWritten; + } + + /** + * Called when one of the IO operations matching the `pollEvents` mask may be + * performed without blocking. + */ + onReady() { + let writes = this.pending; + while (writes.length) { + let {buffer, resolve, length} = writes[0]; + + let written = this.writeBuffer(buffer); + + if (written == buffer.byteLength) { + resolve(length); + this.shiftPending(); + } else if (written > 0) { + writes[0].buffer = buffer.slice(written); + } else { + break; + } + } + + if (writes.length == 0) { + io.updatePollFds(); + } + } +} + +class Signal { + constructor(fd) { + this.fd = fd; + } + + cleanup() { + libc.close(this.fd); + this.fd = null; + } + + get pollEvents() { + return LIBC.POLLIN; + } + + /** + * Called when an error occurred while polling our file descriptor. + */ + onError() { + io.shutdown(); + } + + /** + * Called when one of the IO operations matching the `pollEvents` mask may be + * performed without blocking. + */ + onReady() { + let buffer = new ArrayBuffer(16); + let count = +libc.read(this.fd, buffer, buffer.byteLength); + if (count > 0) { + io.messageCount += count; + } + } +} + +class Process extends BaseProcess { + /** + * Each Process object opens an additional pipe from the target object, which + * will be automatically closed when the process exits, but otherwise + * carries no data. + * + * This property contains a bit mask of poll() events which we wish to be + * notified of on this descriptor. We're not expecting any input from this + * pipe, but we need to poll for input until the process exits in order to be + * notified when the pipe closes. + */ + get pollEvents() { + if (this.exitCode === null) { + return LIBC.POLLIN; + } + return 0; + } + + /** + * Kills the process with the given signal. + * + * @param {integer} signal + */ + kill(signal) { + libc.kill(this.pid, signal); + this.wait(); + } + + /** + * Initializes the IO pipes for use as standard input, output, and error + * descriptors in the spawned process. + * + * @param {object} options + * The Subprocess options object for this process. + * @returns {unix.Fd[]} + * The array of file descriptors belonging to the spawned process. + */ + initPipes(options) { + let stderr = options.stderr; + + let our_pipes = []; + let their_pipes = new Map(); + + let pipe = input => { + let fds = ctypes.int.array(2)(); + + let res = libc.pipe(fds); + if (res == -1) { + throw new Error("Unable to create pipe"); + } + + fds = Array.from(fds, unix.Fd); + + if (input) { + fds.reverse(); + } + + if (input) { + our_pipes.push(new InputPipe(this, fds[1])); + } else { + our_pipes.push(new OutputPipe(this, fds[1])); + } + + libc.fcntl(fds[0], LIBC.F_SETFD, LIBC.FD_CLOEXEC); + libc.fcntl(fds[1], LIBC.F_SETFD, LIBC.FD_CLOEXEC); + libc.fcntl(fds[1], LIBC.F_SETFL, LIBC.O_NONBLOCK); + + return fds[0]; + }; + + their_pipes.set(0, pipe(false)); + their_pipes.set(1, pipe(true)); + + if (stderr == "pipe") { + their_pipes.set(2, pipe(true)); + } else if (stderr == "stdout") { + their_pipes.set(2, their_pipes.get(1)); + } + + // Create an additional pipe that we can use to monitor for process exit. + their_pipes.set(3, pipe(true)); + this.fd = our_pipes.pop().fd; + + this.pipes = our_pipes; + + return their_pipes; + } + + spawn(options) { + let {command, arguments: args} = options; + + let argv = this.stringArray(args); + let envp = this.stringArray(options.environment); + + let actions = unix.posix_spawn_file_actions_t(); + let actionsp = actions.address(); + + let fds = this.initPipes(options); + + let cwd; + try { + if (options.workdir) { + cwd = ctypes.char.array(LIBC.PATH_MAX)(); + libc.getcwd(cwd, cwd.length); + + if (libc.chdir(options.workdir) < 0) { + throw new Error(`Unable to change working directory to ${options.workdir}`); + } + } + + libc.posix_spawn_file_actions_init(actionsp); + for (let [i, fd] of fds.entries()) { + libc.posix_spawn_file_actions_adddup2(actionsp, fd, i); + } + + let pid = unix.pid_t(); + let rv = libc.posix_spawn(pid.address(), command, actionsp, null, argv, envp); + + if (rv != 0) { + for (let pipe of this.pipes) { + pipe.close(); + } + throw new Error(`Failed to execute command "${command}"`); + } + + this.pid = pid.value; + } finally { + libc.posix_spawn_file_actions_destroy(actionsp); + + this.stringArrays.length = 0; + + if (cwd) { + libc.chdir(cwd); + } + for (let fd of new Set(fds.values())) { + fd.dispose(); + } + } + } + + /** + * Called when input is available on our sentinel file descriptor. + * + * @see pollEvents + */ + onReady() { + // We're not actually expecting any input on this pipe. If we get any, we + // can't poll the pipe any further without reading it. + if (this.wait() == undefined) { + this.kill(9); + } + } + + /** + * Called when an error occurred while polling our sentinel file descriptor. + * + * @see pollEvents + */ + onError() { + this.wait(); + } + + /** + * Attempts to wait for the process's exit status, without blocking. If + * successful, resolves the `exitPromise` to the process's exit value. + * + * @returns {integer|null} + * The process's exit status, if it has already exited. + */ + wait() { + if (this.exitCode !== null) { + return this.exitCode; + } + + let status = ctypes.int(); + + let res = libc.waitpid(this.pid, status.address(), LIBC.WNOHANG); + if (res == this.pid) { + let sig = unix.WTERMSIG(status.value); + if (sig) { + this.exitCode = -sig; + } else { + this.exitCode = unix.WEXITSTATUS(status.value); + } + + this.fd.dispose(); + io.updatePollFds(); + this.resolveExit(this.exitCode); + return this.exitCode; + } + } +} + +io = { + pollFds: null, + pollHandlers: null, + + pipes: new Map(), + + processes: new Map(), + + messageCount: 0, + + running: true, + + init(details) { + this.signal = new Signal(details.signalFd); + this.updatePollFds(); + + setTimeout(this.loop.bind(this), 0); + }, + + shutdown() { + if (this.running) { + this.running = false; + + this.signal.cleanup(); + this.signal = null; + + self.postMessage({msg: "close"}); + self.close(); + } + }, + + getPipe(pipeId) { + let pipe = this.pipes.get(pipeId); + + if (!pipe) { + let error = new Error("File closed"); + error.errorCode = SubprocessConstants.ERROR_END_OF_FILE; + throw error; + } + return pipe; + }, + + getProcess(processId) { + let process = this.processes.get(processId); + + if (!process) { + throw new Error(`Invalid process ID: ${processId}`); + } + return process; + }, + + updatePollFds() { + let handlers = [this.signal, + ...this.pipes.values(), + ...this.processes.values()]; + + handlers = handlers.filter(handler => handler.pollEvents); + + let pollfds = unix.pollfd.array(handlers.length)(); + + for (let [i, handler] of handlers.entries()) { + let pollfd = pollfds[i]; + + pollfd.fd = handler.fd; + pollfd.events = handler.pollEvents; + pollfd.revents = 0; + } + + this.pollFds = pollfds; + this.pollHandlers = handlers; + }, + + loop() { + this.poll(); + if (this.running) { + setTimeout(this.loop.bind(this), 0); + } + }, + + poll() { + let handlers = this.pollHandlers; + let pollfds = this.pollFds; + + let timeout = this.messageCount > 0 ? 0 : POLL_TIMEOUT; + let count = libc.poll(pollfds, pollfds.length, timeout); + + for (let i = 0; count && i < pollfds.length; i++) { + let pollfd = pollfds[i]; + if (pollfd.revents) { + count--; + + let handler = handlers[i]; + try { + let success = false; + if (pollfd.revents & handler.pollEvents) { + success = handler.onReady(); + } + // Only call the error handler in this iteration if we didn't also + // have a success. This is necessary because Linux systems set POLLHUP + // on a pipe when it's closed but there's still buffered data to be + // read, and Darwin sets POLLIN and POLLHUP on a closed pipe, even + // when there's no data to be read. + if (!success && (pollfd.revents & (LIBC.POLLERR | LIBC.POLLHUP | LIBC.POLLNVAL))) { + handler.onError(); + } + } catch (e) { + console.error(e); + debug(`Worker error: ${e} :: ${e.stack}`); + handler.onError(); + } + + pollfd.revents = 0; + } + } + }, + + addProcess(process) { + this.processes.set(process.id, process); + + for (let pipe of process.pipes) { + this.pipes.set(pipe.id, pipe); + } + }, + + cleanupProcess(process) { + this.processes.delete(process.id); + }, +}; |