plex-web-downloader/node_modules/cluster/lib/worker.js

233 lines
4.5 KiB
JavaScript

/*!
* Cluster - Worker
* Copyright(c) 2011 LearnBoost <dev@learnboost.com>
* MIT Licensed
*/
/**
* Module dependencies.
*/
var EventEmitter = require('events').EventEmitter
, spawn = require('child_process').spawn
, binding = process.binding('net')
, utils = require('./utils')
, net = require('net');
/**
* Node binary.
*/
var node = process.execPath;
/**
* Initialize a new `Worker` with the given `master`.
*
* Signals:
*
* - `SIGINT` immediately exit
* - `SIGTERM` immediately exit
* - `SIGQUIT` graceful exit
*
* @param {Master} master
* @api private
*/
var Worker = module.exports = function Worker(master) {
this.master = master;
this.server = master.server;
};
/**
* Inherit from `EventEmitter.prototype`.
*/
Worker.prototype.__proto__ = EventEmitter.prototype;
/**
* Worker is a receiver.
*/
require('./mixins/receiver')(Worker.prototype);
/**
* Start worker.
*
* @api private
*/
Worker.prototype.start = function(){
var self = this
, call = this.master.call;
// proxy to provide worker id
this.master.call = function(){
var args = utils.toArray(arguments);
// Allow calling master methods that
// don't take worker as first argument
if (false !== args[0]) args.unshift(self.id);
return call.apply(this, args);
};
// stdin
this.stdin = new net.Socket(0, 'unix');
this.stdin.setEncoding('ascii');
this.stdin.on('data', this.frame.bind(this));
this.stdin.resume();
// demote usr/group
if (this.server && this.server.listenFD) {
this.server.on('listening', function(){
var group = self.options.group;
if (group) process.setgid(group);
var user = self.options.user;
if (user) process.setuid(user);
});
// stdin
this.stdin.on('fd', this.server.listenFD.bind(this.server));
}
// signal handlers
process.on('SIGINT', this.destroy.bind(this));
process.on('SIGTERM', this.destroy.bind(this));
process.on('SIGQUIT', this.close.bind(this));
// conditionally handle uncaughtException
process.nextTick(function(){
if (!process.listeners('uncaughtException').length) {
process.on('uncaughtException', function(err){
// stderr for logs
console.error(err.stack || err.message);
// report exception
self.master.call('workerException', err);
// exit
process.nextTick(function(){
self.destroy();
});
});
}
});
};
/**
* Received connect event, set the worker `id`
* and `options`.
*
* @param {String} id
* @param {Object} options
* @api private
*/
Worker.prototype.connect = function(id, options){
this.options = options;
// worker id
this.id = id;
// timeout
this.timeout = options.timeout;
// title
process.title = options['worker title'].replace('{n}', id);
// notify master of connection
this.master.call('connect');
};
/**
* Immediate shutdown.
*
* @api private
*/
Worker.prototype.destroy = function(){
this.emit('close');
process.nextTick(process.exit);
};
/**
* Perform graceful shutdown.
*
* @api private
*/
Worker.prototype.close = function(){
var self = this
, server = this.server;
if (server && server.connections) {
// stop accepting
server.watcher.stop();
// check pending connections
setInterval(function(){
self.master.call('workerWaiting', server.connections);
server.connections || self.destroy();
}, 2000);
// timeout
if (this.timeout) {
setTimeout(function(){
self.master.call('workerTimeout', self.timeout);
self.destroy();
}, this.timeout);
}
} else {
this.destroy();
}
};
/**
* Spawn the worker with the given `id`.
*
* @param {Number} id
* @return {Worker} for chaining
* @api private
*/
Worker.prototype.spawn = function(id){
var fds = binding.socketpair()
, customFds = [fds[0]].concat(this.master.customFds)
, env = {};
// merge env
for (var key in process.env) {
env[key] = process.env[key];
}
this.id = env.CLUSTER_WORKER = id;
// spawn worker process
this.proc = spawn(
node
, this.master.cmd
, { customFds: customFds, env: env });
// unix domain socket for ICP + fd passing
this.sock = new net.Socket(fds[1], 'unix');
// saving file descriptors for later use
this.fds = fds;
return this;
};
/**
* Invoke worker's `method` (called from Master).
*
* @param {String} method
* @param {...} args
* @api private
*/
Worker.prototype.call = function(method){
this.sock.write(utils.frame({
args: utils.toArray(arguments, 1)
, method: method
}), 'ascii');
};