diff --git a/resources/celerity/map.php b/resources/celerity/map.php --- a/resources/celerity/map.php +++ b/resources/celerity/map.php @@ -8,7 +8,7 @@ 'names' => array( 'core.pkg.css' => 'd82d2f53', - 'core.pkg.js' => '88ca2043', + 'core.pkg.js' => '4af4aa9d', 'darkconsole.pkg.js' => 'ca8671ce', 'differential.pkg.css' => '4a93db37', 'differential.pkg.js' => 'eca39a2c', @@ -334,9 +334,9 @@ 'rsrc/image/texture/table_header.png' => '5c433037', 'rsrc/image/texture/table_header_hover.png' => '038ec3b9', 'rsrc/image/texture/table_header_tall.png' => 'd56b434f', - 'rsrc/js/application/aphlict/Aphlict.js' => '493665ee', + 'rsrc/js/application/aphlict/Aphlict.js' => '08be8878', 'rsrc/js/application/aphlict/behavior-aphlict-dropdown.js' => '2a2dba85', - 'rsrc/js/application/aphlict/behavior-aphlict-listen.js' => '027c888a', + 'rsrc/js/application/aphlict/behavior-aphlict-listen.js' => 'acda9f51', 'rsrc/js/application/auth/behavior-persona-login.js' => '9414ff18', 'rsrc/js/application/config/behavior-reorder-fields.js' => '938aed89', 'rsrc/js/application/conpherence/behavior-menu.js' => '7ee23816', @@ -477,7 +477,7 @@ 'rsrc/js/phuix/PHUIXActionListView.js' => 'b5c256b8', 'rsrc/js/phuix/PHUIXActionView.js' => '6e8cefa4', 'rsrc/js/phuix/PHUIXDropdownMenu.js' => 'bd4c8dca', - 'rsrc/swf/aphlict.swf' => 'f45c3edc', + 'rsrc/swf/aphlict.swf' => 'd9bca85d', ), 'symbols' => array( @@ -525,10 +525,10 @@ 'herald-rule-editor' => '22d2966a', 'herald-test-css' => '778b008e', 'inline-comment-summary-css' => '8cfd34e8', - 'javelin-aphlict' => '493665ee', + 'javelin-aphlict' => '08be8878', 'javelin-behavior' => '8a3ed18b', 'javelin-behavior-aphlict-dropdown' => '2a2dba85', - 'javelin-behavior-aphlict-listen' => '027c888a', + 'javelin-behavior-aphlict-listen' => 'acda9f51', 'javelin-behavior-aphront-basic-tokenizer' => 'b3a4b884', 'javelin-behavior-aphront-crop' => 'b98fc918', 'javelin-behavior-aphront-drag-and-drop-textarea' => '4a11ea9c', @@ -819,18 +819,6 @@ 4 => 'javelin-vector', 5 => 'differential-inline-comment-editor', ), - '027c888a' => - array( - 0 => 'javelin-behavior', - 1 => 'javelin-aphlict', - 2 => 'javelin-stratcom', - 3 => 'javelin-request', - 4 => 'javelin-uri', - 5 => 'javelin-dom', - 6 => 'javelin-json', - 7 => 'javelin-router', - 8 => 'phabricator-notification', - ), '029a133d' => array( 0 => 'aphront-dialog-view-css', @@ -865,6 +853,11 @@ 3 => 'javelin-vector', 4 => 'javelin-stratcom', ), + '08be8878' => + array( + 0 => 'javelin-install', + 1 => 'javelin-util', + ), '08e56a4e' => array( 0 => 'javelin-install', @@ -1153,11 +1146,6 @@ 2 => 'javelin-stratcom', 3 => 'phabricator-tooltip', ), - '493665ee' => - array( - 0 => 'javelin-install', - 1 => 'javelin-util', - ), '4a11ea9c' => array( 0 => 'javelin-behavior', @@ -1620,6 +1608,18 @@ 1 => 'javelin-dom', 2 => 'javelin-stratcom', ), + 'acda9f51' => + array( + 0 => 'javelin-behavior', + 1 => 'javelin-aphlict', + 2 => 'javelin-stratcom', + 3 => 'javelin-request', + 4 => 'javelin-uri', + 5 => 'javelin-dom', + 6 => 'javelin-json', + 7 => 'javelin-router', + 8 => 'phabricator-notification', + ), 'ad7a69ca' => array( 0 => 'javelin-install', diff --git a/src/view/page/PhabricatorStandardPageView.php b/src/view/page/PhabricatorStandardPageView.php --- a/src/view/page/PhabricatorStandardPageView.php +++ b/src/view/page/PhabricatorStandardPageView.php @@ -376,16 +376,23 @@ $swf_uri = $response->getURI($map, 'rsrc/swf/aphlict.swf'); $enable_debug = PhabricatorEnv::getEnvConfig('notification.debug'); + + $subscriptions = $this->pageObjects; + if ($user) { + $subscriptions[] = $user->getPHID(); + } + Javelin::initBehavior( 'aphlict-listen', array( - 'id' => $aphlict_object_id, - 'containerID' => $aphlict_container_id, - 'server' => $client_uri->getDomain(), - 'port' => $client_uri->getPort(), - 'debug' => $enable_debug, - 'swfURI' => $swf_uri, - 'pageObjects' => array_fill_keys($this->pageObjects, true), + 'id' => $aphlict_object_id, + 'containerID' => $aphlict_container_id, + 'server' => $client_uri->getDomain(), + 'port' => $client_uri->getPort(), + 'debug' => $enable_debug, + 'swfURI' => $swf_uri, + 'pageObjects' => array_fill_keys($this->pageObjects, true), + 'subscriptions' => $subscriptions, )); $tail[] = phutil_tag( diff --git a/support/aphlict/client/src/AphlictClient.as b/support/aphlict/client/src/AphlictClient.as --- a/support/aphlict/client/src/AphlictClient.as +++ b/support/aphlict/client/src/AphlictClient.as @@ -2,6 +2,7 @@ import flash.events.TimerEvent; import flash.external.ExternalInterface; + import flash.utils.Dictionary; import flash.utils.Timer; @@ -43,7 +44,11 @@ {}); } - public function externalConnect(server:String, port:Number):void { + public function externalConnect( + server:String, + port:Number, + subscriptions:Array):void { + this.externalInvoke('connect'); this.remoteServer = server; @@ -56,6 +61,10 @@ this.timer.addEventListener(TimerEvent.TIMER, this.keepalive); this.connectToMaster(); + + // Send subscriptions to master. + this.log('Sending subscriptions to master.'); + this.send.send('aphlict_master', 'subscribe', this.client, subscriptions); } /** diff --git a/support/aphlict/client/src/AphlictMaster.as b/support/aphlict/client/src/AphlictMaster.as --- a/support/aphlict/client/src/AphlictMaster.as +++ b/support/aphlict/client/src/AphlictMaster.as @@ -40,6 +40,11 @@ */ private var remotePort:Number; + /** + * A dictionary mapping PHID to subscribed clients. + */ + private var subscriptions:Dictionary; + private var socket:Socket; private var readBuffer:ByteArray; @@ -50,12 +55,13 @@ this.remoteServer = server; this.remotePort = port; + this.clients = new Dictionary(); + this.subscriptions = new Dictionary(); + // Connect to the Aphlict Server. this.recv.connect('aphlict_master'); this.connectToServer(); - this.clients = new Dictionary(); - // Start a timer and regularly purge dead clients. this.timer = new Timer(AphlictMaster.PURGE_INTERVAL); this.timer.addEventListener(TimerEvent.TIMER, this.purgeClients); @@ -116,6 +122,16 @@ private function didConnectSocket(event:Event):void { this.externalInvoke('connected'); + + // Send subscriptions + var phids = new Array(); + for (var phid:String in this.subscriptions) { + phids.push(phid); + } + + if (phids.length) { + this.sendSubscribeCommand(phids); + } } private function didCloseSocket(event:Event):void { @@ -130,6 +146,69 @@ this.externalInvoke('error', event.text); } + public function subscribe(client:String, phids:Array):void { + var newPHIDs = new Array(); + + for (var i:String in phids) { + var phid = phids[i]; + if (!this.subscriptions[phid]) { + this.subscriptions[phid] = new Dictionary(); + newPHIDs.push(phid); + } + this.subscriptions[phid][client] = true; + } + + if (newPHIDs.length) { + this.sendSubscribeCommand(newPHIDs); + } + } + + public function unsubscribe(client:String, phids:Array):void { + var oldPHIDs = new Array(); + + for (var phid:String in phids) { + if (!this.subscriptions[phid]) { + continue; + } + + delete this.subscriptions[phid][client]; + + var empty = true; + for (var key:String in this.subscriptions[phid]) { + empty = false; + } + + if (empty) { + delete this.subscriptions[phid]; + oldPHIDs.push(phid); + } + } + + if (oldPHIDs.length) { + this.sendUnsubscribeCommand(oldPHIDs); + } + } + + private function sendSubscribeCommand(phids:Array):void { + var msg:Dictionary = new Dictionary(); + msg['command'] = 'subscribe'; + msg['data'] = phids; + + this.log('Sending subscribe command to server.'); + this.socket.writeUTF(vegas.strings.JSON.serialize(msg)); + this.socket.flush(); + } + + private function sendUnsubscribeCommand(phids:Array):void { + var msg:Dictionary = new Dictionary(); + msg['command'] = 'unsubscribe'; + msg['data'] = phids; + + this.log('Sending subscribe command to server.'); + this.socket.writeUTF(vegas.strings.JSON.serialize(msg)); + this.socket.flush(); + } + private function didReceiveSocket(event:Event):void { try { var b:ByteArray = this.readBuffer; @@ -153,8 +232,22 @@ // Send the message to all clients. for (var client:String in this.clients) { - this.log('Sending message to client: ' + client); - this.send.send(client, 'receiveMessage', data); + var subscribed = false; + + for (var i:String in data.subscribers) { + var phid = data.subscribers[i]; + + if (this.subscriptions[phid] && + this.subscriptions[phid][client]) { + subscribed = true; + break; + } + } + + if (subscribed) { + this.log('Sending message to client: ' + client); + this.send.send(client, 'receiveMessage', data); + } } } else { break; diff --git a/support/aphlict/server/aphlict_server.js b/support/aphlict/server/aphlict_server.js --- a/support/aphlict/server/aphlict_server.js +++ b/support/aphlict/server/aphlict_server.js @@ -83,6 +83,61 @@ listener.getDescription(), socket.remoteAddress); + var buffer = new Buffer([]); + var length = 0; + + socket.on('data', function(data) { + buffer = Buffer.concat([buffer, new Buffer(data)]); + + while (buffer.length) { + if (!length) { + length = buffer.readUInt16BE(0); + buffer = buffer.slice(2); + } + + if (buffer.length < length) { + // We need to wait for the rest of the data. + return; + } + + var message; + try { + message = JSON.parse(buffer.toString('utf8', 0, length)); + } catch (err) { + debug.log('<%s> Received invalid data.', listener.getDescription()); + continue; + } finally { + buffer = buffer.slice(length); + length = 0; + } + + debug.log('<%s> Received data: %s', + listener.getDescription(), + JSON.stringify(message)); + + switch (message.command) { + case 'subscribe': + debug.log( + '<%s> Subscribed to: %s', + listener.getDescription(), + JSON.stringify(message.data)); + listener.subscribe(message.data); + break; + + case 'unsubscribe': + debug.log( + '<%s> Unsubscribed from: %s', + listener.getDescription(), + JSON.stringify(message.data)); + listener.unsubscribe(message.data); + break; + + default: + debug.log(' Unrecognized command.', listener.getDescription()); + } + } + }); + socket.on('close', function() { clients.removeListener(listener); debug.log('<%s> Disconnected', listener.getDescription()); @@ -122,7 +177,7 @@ debug.log('notification: ' + JSON.stringify(msg)); ++messages_in; - broadcast(msg); + transmit(msg); response.writeHead(200, {'Content-Type': 'text/plain'}); } catch (err) { @@ -161,12 +216,16 @@ }).listen(config.admin, config.host); -function broadcast(data) { - var listeners = clients.getListeners(); - for (var id in listeners) { - var listener = listeners[id]; +function transmit(msg) { + var listeners = clients.getListeners().filter(function(client) { + return client.isSubscribedToAny(msg.subscribers); + }); + + for (var i = 0; i < listeners.length; i++) { + var listener = listeners[i]; + try { - listener.writeMessage(data); + listener.writeMessage(msg); ++messages_out; debug.log('<%s> Wrote Message', listener.getDescription()); diff --git a/support/aphlict/server/lib/AphlictListener.js b/support/aphlict/server/lib/AphlictListener.js --- a/support/aphlict/server/lib/AphlictListener.js +++ b/support/aphlict/server/lib/AphlictListener.js @@ -9,11 +9,37 @@ members: { _id: null, _socket: null, + _subscriptions: {}, getID: function() { return this._id; }, + subscribe: function(phids) { + for (var i = 0; i < phids.length; i++) { + var phid = phids[i]; + this._subscriptions[phid] = true; + } + + return this; + }, + + unsubscribe: function(phids) { + for (var i = 0; i < phids.length; i++) { + var phid = phids[i]; + delete this._subscriptions[phid]; + } + + return this; + }, + + isSubscribedToAny: function(phids) { + var intersection = phids.filter(function(phid) { + return phid in this._subscriptions; + }, this); + return intersection.length > 0; + }, + getSocket: function() { return this._socket; }, diff --git a/support/aphlict/server/lib/AphlictListenerList.js b/support/aphlict/server/lib/AphlictListenerList.js --- a/support/aphlict/server/lib/AphlictListenerList.js +++ b/support/aphlict/server/lib/AphlictListenerList.js @@ -31,7 +31,14 @@ }, getListeners: function() { - return this._listeners; + var keys = Object.keys(this._listeners); + var listeners = []; + + for (var i = 0; i < keys.length; i++) { + listeners.push(this._listeners[keys[i]]); + } + + return listeners; }, getActiveListenerCount: function() { diff --git a/webroot/rsrc/js/application/aphlict/Aphlict.js b/webroot/rsrc/js/application/aphlict/Aphlict.js --- a/webroot/rsrc/js/application/aphlict/Aphlict.js +++ b/webroot/rsrc/js/application/aphlict/Aphlict.js @@ -25,7 +25,7 @@ */ JX.install('Aphlict', { - construct : function(id, server, port) { + construct : function(id, server, port, subscriptions) { if (__DEV__) { if (JX.Aphlict._instance) { JX.$E('Aphlict object is sort of a singleton..!'); @@ -36,6 +36,7 @@ this._server = server; this._port = port; + this._subscriptions = subscriptions; // Flash puts its "objects" into global scope in an inconsistent way, // because it was written in like 1816 when globals were awesome and IE4 @@ -48,8 +49,12 @@ members : { _server : null, _port : null, + _subscriptions : null, start : function() { - this._flashContainer.connect(this._server, this._port); + this._flashContainer.connect( + this._server, + this._port, + this._subscriptions); } }, diff --git a/webroot/rsrc/js/application/aphlict/behavior-aphlict-listen.js b/webroot/rsrc/js/application/aphlict/behavior-aphlict-listen.js --- a/webroot/rsrc/js/application/aphlict/behavior-aphlict-listen.js +++ b/webroot/rsrc/js/application/aphlict/behavior-aphlict-listen.js @@ -16,7 +16,13 @@ var showing_reload = false; function onready() { - var client = new JX.Aphlict(config.id, config.server, config.port) + var client = new JX.Aphlict( + config.id, + config.server, + config.port, + config.subscriptions); + + client .setHandler(onaphlictmessage) .start(); } diff --git a/webroot/rsrc/swf/aphlict.swf b/webroot/rsrc/swf/aphlict.swf index 0000000000000000000000000000000000000000..0000000000000000000000000000000000000000 GIT binary patch literal 0 Hc$@