X Tutup
var net = require('net'); var tls = require('tls'); var http = require('http'); var EventEmitter = require('events').EventEmitter; var protocol = require('koding-dnode-protocol'); // koding has own implementation of dnode-protocol. var Lazy = require('lazy'); var SocketIO = require('./lib/stream_socketio'); exports = module.exports = dnode; function dnode (wrapper) { if (!(this instanceof dnode)) return new dnode(wrapper); this.proto = protocol(wrapper); this.stack = []; this.streams = []; return this; } dnode.prototype = new EventEmitter; dnode.prototype.use = function (middleware) { this.stack.push(middleware); return this; }; dnode.prototype.connect = function () { var params = protocol.parseArgs(arguments); var stream = params.stream; var client = null; var self = this; if (params.port) { params.host = params.host || '127.0.0.1'; if (params.key) { var options = { key: params.key, cert: params.cert, ca: params.ca, requestCert: params.requestCert, rejectUnauthorized: params.rejectUnauthorized }; stream = tls.connect(params.port, params.host, options, function() { attachDnode(); }); } else { stream = net.createConnection(params.port, params.host); stream.on('connect', function() { attachDnode(); }); } } else if (params.path) { stream = net.createConnection(params.path); stream.on('connect', function() { attachDnode(); }); } else { attachDnode(); } stream.remoteAddress = params.host; stream.remotePort = params.port; var args = arguments; if (params.reconnect) { stream.on('error', (function (err) { if (err.code === 'ECONNREFUSED' || err.code === 'ENOENT') { if (client) client.emit('refused'); setTimeout((function () { if (client) client.emit('reconnect'); dnode.prototype.connect.apply(this, args); }).bind(this), params.reconnect); } else if (client) client.emit('error', err) else this.emit('error', err) }).bind(this)); stream.once('end', (function () { if (!params.reconnect) return; client.emit('drop'); setTimeout((function () { if (!params.reconnect) return; client.emit('reconnect'); dnode.prototype.connect.apply(this, args); }).bind(this), params.reconnect); }).bind(this)); } else { stream.on('error', (function (err) { if (client) client.emit('error', err) else this.emit('error', err) }).bind(this)); } function attachDnode() { if (!stream) { self.emit('error', new Error('Could not create a stream with this information ' + JSON.stringify(params))) return null } client = createClient(self.proto, stream); client.end = function () { if (params.reconnect) params.reconnect = 0; stream.end(); }; self.stack.forEach(function (middleware) { middleware.call(client.instance, client.remote, client); }); if (params.block) { client.on('remote', function () { params.block.call(client.instance, client.remote, client); }); } process.nextTick(function () { if (client.listeners('error').length === 0) { // default error handler to keep everything from crashing client.on('error', function (err) { console.error(err && err.stack || err); }) } }); client.start(); }; this.streams.push(stream); return this; }; dnode.prototype.listen = function () { var self = this; var params = protocol.parseArgs(arguments); var server = params.server; if (params.port) { if (params.key) { var options = { key: params.key, cert: params.cert, ca: params.ca, requestCert: params.requestCert, rejectUnauthorized: params.rejectUnauthorized }; server = tls.createServer(options); server.on('error', this.emit.bind(this, 'error')); if (params.host) { server.listen( params.port, params.host, this.emit.bind(this, 'ready') ); } else { server.listen( params.port, this.emit.bind(this, 'ready') ); } } else { server = net.createServer(); server.on('error', this.emit.bind(this, 'error')); server.listen( params.port, params.host, this.emit.bind(this, 'ready') ); } } else if (params.path) { server = net.createServer(); server.on('error', this.emit.bind(this, 'error')); server.listen( params.path, this.emit.bind(this, 'ready') ); } else if (server && ( server instanceof http.Server || server.hasOwnProperty('httpAllowHalfOpen') || params.webserver )) { // a webserver, use socket.io server = SocketIO( server || params.webserver, params.mount === undefined ? '/dnode.js' : params.mount, params.io || {} ); } if (!server) { this.emit('error', new Error('Not sure how to fire up this listener')); } var clients = {}; var listenFor = server instanceof tls.Server ? 'secureConnection' : 'connection' ; server.on(listenFor, function (stream) { var client = createClient(self.proto, stream); clients[client.id] = client; self.stack.forEach(function (middleware) { middleware.call(client.instance, client.remote, client); }); if (params.block) { client.on('remote', function () { params.block.call(client.instance, client.remote, client); }); } client.start(); }); this.server = server; server.on('close', this.emit.bind(this, 'close')); return this; }; function createClient (proto, stream) { var client = proto.create(); process.nextTick(function () { if (client.listeners('error').length === 0) { // default error handler to keep everything from crashing client.on('error', function (err) { console.error(err && err.stack || err); }) } }); client.stream = stream; client.end = stream.end.bind(stream); client.destroy = stream.destroy.bind(stream); stream.on('end', client.emit.bind(client, 'end')); stream.on('disconnect', client.emit.bind(client, 'disconnect')); stream.on('close', client.emit.bind(client, 'close')); client.on('request', function (req) { if (stream.writable) { stream.write(JSON.stringify(req) + '\n'); } else { client.emit('dropped', req); } }); Lazy(stream).lines .map(String) .forEach(client.parse) ; return client; } dnode.prototype.end = function () { Object.keys(this.proto.sessions) .forEach((function (id) { this.proto.sessions[id].stream.end() }).bind(this)) ; this.emit('end'); }; dnode.prototype.close = function () { var self = this try { self.server.close(); } catch (e) { if (e.message === 'Not running') { self.server.emit('close'); } else { self.server.emit('error', e); } } }; dnode.connect = function () { var d = dnode(); return d.connect.apply(d, arguments); }; dnode.listen = function () { var d = dnode(); return d.listen.apply(d, arguments); };
X Tutup