Stomp Client Multiplexing

WebSockets are great. Even when they are not supported we can usually make due with shim technologies such as sockjs to provide us with the main features offered by WebSockets. Something with neither WebSockets or the shim technologies supports however is the ability to maintain several clients from one browser session (granted this is a bit beyond the websocket specification; but it is still a fact we must cope with).

Why is this an issue? An example of the problems that may arise:

An application is using stomp over WebSockets such as is provided by the RabbitMQ Web Stomp Plugin. In a moderately complex application several independent modules would need WebSockets access to listen queues and send messages. The simplest approach would be to let each module initialize their own client to minimize the inter-dependencies between modules. This is prevented by many browser WebSocket implementations, as well as by limitations in the shim libraries due to limitations in the number of http connections. Shim libraries usually have both long and short HTTP polling as a fallback strategy.

To get around this we can implement multiplexing in the client application to simulate the ability to maintain several concurrent WebSocket connections. I made a quick attempt at this in an angular context using the sockjs client and the stomp client library recommended by RabbitMQ.

.service('Guid',[function() {
        return {
            generate : function() {
                function s4() {
                    return Math.floor((1 + Math.random()) * 0x10000)
                        .toString(16)
                        .substring(1);
                }
                return s4() + s4() + '-' + s4() + '-' + s4() + '-' +
                    s4() + '-' + s4() + s4() + s4();
            }
        }
    }])
    .service('StompClientFactory', ['$q', 'Guid', function($q, Guid) {
        var _self = {};

        var ws = new SockJS('http://' + window.location.hostname + ':15674/stomp');
        var stompClient = Stomp.over(ws);
        // SockJS does not support heart-beat: disable heart-beats
        stompClient.heartbeat.outgoing = 0;
        stompClient.heartbeat.incoming = 0;

        var deferred = $q.defer();

        //The stomp interface presented to the rest of the angular app
        function makeWrappedClient(stompClient) {
            var that = {};
            var listenerIds = [];
            function makeListenerId() {
                var listenerId = Guid.generate();
                listenerIds.push(listenerId);
                return listenerId;
            };
            that.makeSender = function(sendDestination) {
                return function(data) {
                    stompClient.send(sendDestination, {'content-type':'application/json'}, JSON.stringify(data));
                };
            };
            that.makeTempSender = function(sendDestination, callback) {
                var listenerId = makeListenerId();
                var temporaryQueue = '/temp-queue/'+listenerId;
                _self.registerListener(listenerId, temporaryQueue, callback);
                return function(msgBody) {
                    stompClient.send( sendDestination, {
                        'reply-to': temporaryQueue,
                        'content-type':'application/json'
                    }, JSON.stringify(msgBody));
                };
            };
            that.send = function(sendDestination, msgBody) {
                stompClient.send(sendDestination, {'content-type':'application/json'}, JSON.stringify(msgBody));
            };
            that.listenTo = function(receiveDestination, callback) {
                var listenerId = makeListenerId();
                _self.registerListener(listenerId, receiveDestination, callback);
                _self.stompListenTo(receiveDestination);
            };
            that.destroy = function() {
                angular.forEach(listenerIds, function(listenerId) {
                    _self.removeListener(listenerId);
                });
            };
            return that;
        };

        //Listeners are the local callbacks, they are all associated with a destination
        var listeners = {};
        _self.registerListener = function(id, destination, func) {
            listeners[id] = {
                destination : destination,
                listenFunction : func
            };
        };
        _self.removeListener = function(id) {
            delete listeners[id];
        };

        //Here is where the local routing happens.
        function notifyListeners(msg) {
            angular.forEach(listeners, function(listener) {
                if(listener.destination === msg.headers.destination) {
                    msg.body = JSON.parse(msg.body);
                    listener.listenFunction(msg);
                }
            });
        };
        _self.stompListenTo = function(destination) {
            stompClient.subscribe(destination, function(d) {
                notifyListeners(d);
            });
        };

        var on_connect = function(x) {
            deferred.resolve(makeWrappedClient(stompClient));
        };
        var on_error =  function() {
            deferred.reject("Failed to connect to message broker");
        };

        //Called when messages are received on a temporary channel
        stompClient.onreceive = function(m) {
            notifyListeners(m);
        };

        stompClient.connect('guest', 'guest', on_connect, on_error, '/');

        return deferred.promise;
    }])

And then using it.

        StompClientFactory.then(function success(client){
            console.log('connected to rabbitmq');
            
            //using it wih permanent queues
            client.listenTo('/amq/queue/a-client', function(data) {
                console.log('got this on a-client queue:');
                console.log(data.body);
            });
            var send = client.makeSender('/amq/queue/a-client');
            send({
                msg: '123',
                status: true
            });
            
            //Using it with temporary queues
            client.listenTo('/queue/temporary-client', function(msg) {
                console.log('got this on tqueue queue:');
                console.log(msg.body);
                client.send(msg.headers['reply-to'], {
                    msg: "got the message on the temporary queue"
                });
            });

            var temporarySender = client.makeTempSender('/queue/temporary-client',function(d){
                console.log("Got this back on the tmp queue"+d);
                console.log(d);
            });
            temporarySender({
                msg: "temporaryMsg"
            });
        }, function error() {
            console.log('failed to connect to rabbitmq');
        });

Will add a git repo which contains this example code. Will probably refine it a bit until then

Leave a Reply

Your email address will not be published. Required fields are marked *