How to use RxJs with Socket.IO on event

I want to use RxJS inside of my socket.on('sense',function(data){});. I am stuck and confused with very few documentation available and my lack of understanding RxJS. Here is my problem.

I have a distSensor.js that has a function pingEnd()

function pingEnd(x){
socket.emit("sense", dist); //pingEnd is fired when an Interrupt is generated.

Inside my App.js I have

io.on('connection', function (socket) {
    socket.on('sense', function (data) {
        //console.log('sense from App4 was called ' + data);

The sense function gets lots of sensor data which I want to filter using RxJS and I don’t know what should I do next to use RxJs here. Any pointers to right docs or sample would help.

You can use Rx.Observable.fromEvent (

Here’s how I did a similar thing using Bacon.js, which has a very similar API:

So in Bacon.js it would go like

io.on('connection', function(socket){
  Bacon.fromEvent(socket, "sense")
    .filter(function(data) { return true })
    .forEach(function(data) { dealWith(data) })

And in RxJs you’d replace Bacon.fromEvent with Rx.Observable.fromEvent.

I have experienced some strange issues using the fromEvent method, so I prefer just to create my own Observable:

function RxfromIO (io, eventName) {
  return Rx.Observable.create(observer => {
    io.on(eventName, (data) => {
    return {
        dispose : io.close

I can then use like this:

let $connection = RxfromIO(io, 'connection');

You can create an Observable like so:

var senses = Rx.Observable.fromEventPattern(
    function add (h) {

Then use senses like any other Observable.

Simply use fromEvent(). Here is a full example in Node.js but works the same in browser. Note that i use first() and takeUntil() to prevent a memory leak: first() only listens to one event and then completes. Now use takeUntil() on all other socket-events you listen to so the observables complete on disconnect:

const app = require('express')();
const server = require('http').createServer(app);
const io = require('')(server);
const Rx = require('rxjs/Rx');

connection$ = Rx.Observable.fromEvent(io, 'connection');

connection$.subscribe(socket => {
    console.log(`Client connected`);

    // Observables
    const disconnect$ = Rx.Observable.fromEvent(socket, 'disconnect').first();
    const message$ = Rx.Observable.fromEvent(socket, 'message').takeUntil(disconnect$);

    // Subscriptions
    message$.subscribe(data => {
        console.log(`Got message from client with data: ${data}`);
        io.emit('message', data); // Emit to all clients

    disconnect$.subscribe(() => {
        console.log(`Client disconnected`);


ES6 one liner that i use, using ES7 bind syntax:
(read $ as stream)

import { Observable } from 'rxjs'

// create socket

const message$ = Observable.create($ => socket.on('message', ::$.next))
// translates to: Observable.create($ => socket.on('message', $.next.bind(this)))

// filter example
const subscription = message$
  .filter(message => message.text !== 'spam')
  //or .filter(({ text }) => text !== 'spam')

You can use rxjs-dom,

Rx.DOM.fromWebSocket(url, protocol, [openObserver], [closeObserver])

// an observer for when the socket is open
var openObserver = Rx.Observer.create(function(e) {'socket open');

  // Now it is safe to send a message

// an observer for when the socket is about to close
var closingObserver = Rx.Observer.create(function() {
  console.log('socket is about to close');

// create a web socket subject
socket = Rx.DOM.fromWebSocket(
  null, // no protocol

// subscribing creates the underlying socket and will emit a stream of incoming
// message events
  function(e) {
    console.log('message: %s',;
  function(e) {
    // errors and "unclean" closes land here
    console.error('error: %s', e);
  function() {
    // the socket has been closed'socket closed');

The answers/resolutions are collected from stackoverflow, are licensed under cc by-sa 2.5 , cc by-sa 3.0 and cc by-sa 4.0 .
Read More:   Update parent scope variable in AngularJS

Similar Posts