diff --git a/src/applications/notification/client/PhabricatorNotificationClient.php b/src/applications/notification/client/PhabricatorNotificationClient.php --- a/src/applications/notification/client/PhabricatorNotificationClient.php +++ b/src/applications/notification/client/PhabricatorNotificationClient.php @@ -2,7 +2,7 @@ final class PhabricatorNotificationClient { - const EXPECT_VERSION = 5; + const EXPECT_VERSION = 6; public static function getServerStatus() { $uri = PhabricatorEnv::getEnvConfig('notification.server-uri'); diff --git a/support/aphlict/server/aphlict_server.js b/support/aphlict/server/aphlict_server.js --- a/support/aphlict/server/aphlict_server.js +++ b/support/aphlict/server/aphlict_server.js @@ -25,10 +25,10 @@ function parse_command_line_arguments(argv) { var config = { - port : 22280, - admin : 22281, - host : '127.0.0.1', - user : null, + port: 22280, + admin: 22281, + host: '127.0.0.1', + user: null, log: '/var/log/aphlict.log' }; @@ -36,10 +36,10 @@ var arg = argv[ii]; var matches = arg.match(/^--([^=]+)=(.*)$/); if (!matches) { - throw new Error("Unknown argument '"+arg+"'!"); + throw new Error("Unknown argument '" + arg + "'!"); } if (!(matches[1] in config)) { - throw new Error("Unknown argument '"+matches[1]+"'!"); + throw new Error("Unknown argument '" + matches[1] + "'!"); } config[matches[1]] = matches[2]; } @@ -52,19 +52,19 @@ if (process.getuid() !== 0) { console.log( - "ERROR: "+ - "This server must be run as root because it needs to bind to privileged "+ - "port 843 to start a Flash policy server. It will downgrade to run as a "+ - "less-privileged user after binding if you pass a user in the command "+ + "ERROR: " + + "This server must be run as root because it needs to bind to privileged " + + "port 843 to start a Flash policy server. It will downgrade to run as a " + + "less-privileged user after binding if you pass a user in the command " + "line arguments with '--user=alincoln'."); process.exit(1); } var net = require('net'); -var http = require('http'); +var http = require('http'); var url = require('url'); -process.on('uncaughtException', function (err) { +process.on('uncaughtException', function(err) { debug.log("\n<<< UNCAUGHT EXCEPTION! >>>\n\n" + err); process.exit(1); }); @@ -82,6 +82,51 @@ listener.getDescription(), socket.remoteAddress); + var buffer = new Buffer([]); + var length = 0; + + socket.on('data', function(data) { + var buf = new Buffer(data); + + if (!length) { + length = buf.readUInt16BE(); + buf = buf.slice(2); + } + + buffer = buffer.concat(buf); + if (buffer.length < length) { + // read more bytes + } + + var message = JSON.parse(buffer.toString('utf8', 0, length)); + buffer = buffer.slice(length); + + debug.log('<%s> Received data: %s', + listener.getDescription(), + JSON.stringify(message)); + + switch (message.command) { + case 'subscribe': + debug.log( + '<%s> Subscribed to: %s', + listener.getDescription(), + JSON.stringify(message.data)); + listener.subscribe(message.data); + break; + + case 'unsubscribe': + debug.log( + '<%s> Unsubscribed from: %s', + listener.getDescription(), + JSON.stringify(message.data)); + listener.unsubscribe(message.data); + break; + + default: + debug.log(' Unrecognized command.', listener.getDescription()); + } + }); + socket.on('close', function() { clients.removeListener(listener); debug.log('<%s> Disconnected', listener.getDescription()); @@ -95,7 +140,7 @@ debug.log('<%s> Ended Connection', listener.getDescription()); }); - socket.on('error', function (e) { + socket.on('error', function(e) { debug.log('<%s> Error: %s', listener.getDescription(), e); }); @@ -107,22 +152,22 @@ var start_time = new Date().getTime(); var receive_server = http.createServer(function(request, response) { - response.writeHead(200, {'Content-Type' : 'text/plain'}); + response.writeHead(200, {'Content-Type': 'text/plain'}); // Publishing a notification. if (request.method == 'POST') { var body = ''; - request.on('data', function (data) { + request.on('data', function(data) { body += data; }); - request.on('end', function () { + request.on('end', function() { ++messages_in; var msg = JSON.parse(body); debug.log('notification: ' + JSON.stringify(msg)); - broadcast(msg.data); + transmit(msg.data, msg.subscribers); response.end(); }); } else if (request.url == '/status/') { @@ -139,7 +184,7 @@ 'messages.in': messages_in, 'messages.out': messages_out, 'log': config.log, - 'version': 5 + 'version': 6 }; response.write(JSON.stringify(status)); @@ -153,10 +198,14 @@ }).listen(config.admin, config.host); -function broadcast(data) { - var listeners = clients.getListeners(); - for (var id in listeners) { - var listener = listeners[id]; +function transmit(data, subscribers) { + var listeners = clients.getListeners().filter(function() { + return this.subscribed(subscribers); + }); + + for (var i = 0; i < listeners.length; i++) { + var listener = listeners[i]; + try { listener.writeMessage(data); diff --git a/support/aphlict/server/lib/AphlictFlashPolicyServer.js b/support/aphlict/server/lib/AphlictFlashPolicyServer.js --- a/support/aphlict/server/lib/AphlictFlashPolicyServer.js +++ b/support/aphlict/server/lib/AphlictFlashPolicyServer.js @@ -17,12 +17,12 @@ _accessPort: null, _debug: null, - setDebugLog : function(log) { + setDebugLog: function(log) { this._debug = log; return this; }, - setAccessPort : function(port) { + setAccessPort: function(port) { this._accessPort = port; return this; }, diff --git a/support/aphlict/server/lib/AphlictListener.js b/support/aphlict/server/lib/AphlictListener.js --- a/support/aphlict/server/lib/AphlictListener.js +++ b/support/aphlict/server/lib/AphlictListener.js @@ -7,22 +7,54 @@ }, members : { - _id : null, - _socket : null, + _id: null, + _socket: null, + _subscriptions: {}, - getID : function() { + getID: function() { return this._id; }, - getSocket : function() { + subscribe: function(phids) { + for (var i = 0; i < phids.length; i++) { + var phid = phids[i]; + + if (!(phid in this._subscriptions)) { + this._subscriptions[phid] = true; + } + } + + return this; + }, + + unsubscribe: function(phids) { + for (var i = 0; i < phids.length; i++) { + var phid = phids[i]; + + if (phid in this._subscriptions) { + delete this._subscriptions[phid]; + } + } + + return this; + }, + + isSubscribedToAny: function(phids) { + var intersection = phids.filter(function(phid) { + return phid in this._subscriptions; + }); + return intersection.length > 0; + }, + + getSocket: function() { return this._socket; }, - getDescription : function() { + getDescription: function() { return 'Listener/' + this.getID(); }, - writeMessage : function(message) { + writeMessage: function(message) { var serial = JSON.stringify(message); var length = Buffer.byteLength(serial, 'utf8'); diff --git a/support/aphlict/server/lib/AphlictListenerList.js b/support/aphlict/server/lib/AphlictListenerList.js --- a/support/aphlict/server/lib/AphlictListenerList.js +++ b/support/aphlict/server/lib/AphlictListenerList.js @@ -2,17 +2,17 @@ JX.require('AphlictListener', __dirname); JX.install('AphlictListenerList', { - construct : function() { + construct: function() { this._listeners = {}; }, - members : { - _listeners : null, - _nextID : 0, - _activeListenerCount : 0, - _totalListenerCount : 0, + members: { + _listeners: null, + _nextID: 0, + _activeListenerCount: 0, + _totalListenerCount: 0, - addListener : function(socket) { + addListener: function(socket) { var listener = new JX.AphlictListener( this._generateNextID(), socket); @@ -24,7 +24,7 @@ return listener; }, - removeListener : function(listener) { + removeListener: function(listener) { var id = listener.getID(); if (id in this._listeners) { delete this._listeners[id]; @@ -32,19 +32,19 @@ } }, - getListeners : function() { + getListeners: function() { return this._listeners; }, - getActiveListenerCount : function() { + getActiveListenerCount: function() { return this._activeListenerCount; }, - getTotalListenerCount : function() { + getTotalListenerCount: function() { return this._totalListenerCount; }, - _generateNextID : function() { + _generateNextID: function() { do { this._nextID = ((this._nextID + 1) % 1000000000000); } while (this._nextID in this._listeners); diff --git a/support/aphlict/server/lib/AphlictLog.js b/support/aphlict/server/lib/AphlictLog.js --- a/support/aphlict/server/lib/AphlictLog.js +++ b/support/aphlict/server/lib/AphlictLog.js @@ -4,16 +4,16 @@ var util = require('util'); JX.install('AphlictLog', { - construct : function() { + construct: function() { this._writeToLogs = []; this._writeToConsoles = []; }, - members : { - _writeToConsoles : null, - _writeToLogs : null, + members: { + _writeToConsoles: null, + _writeToLogs: null, - addLogfile : function(path) { + addLogfile: function(path) { var options = { flags: 'a', encoding: 'utf8', @@ -27,12 +27,12 @@ return this; }, - addConsole : function(console) { + addConsole: function(console) { this._writeToConsoles.push(console); return this; }, - log : function(pattern) { + log: function(pattern) { var str = util.format.apply(null, arguments); var date = new Date().toLocaleString(); str = '[' + date + '] ' + str; diff --git a/support/aphlict/server/lib/javelin.js b/support/aphlict/server/lib/javelin.js --- a/support/aphlict/server/lib/javelin.js +++ b/support/aphlict/server/lib/javelin.js @@ -6,7 +6,7 @@ // NOTE: This is faking out a piece of code in JX.install which waits for // Stratcom before running static initializers. -JX.Stratcom = {ready : true}; +JX.Stratcom = {ready: true}; JX.require('core/Event'); JX.require('core/Stratcom');