california app design company

Establishing a Websocket PUBSUB server with Redis and Asyncio

April 17, 2017

Motivation

If you've been following my never-ending quest for a light-sensor setup as detailed first in this post (set up a Raspberry Pi with Python 3.5) and then in this other post (make a light sensor with an Arduino and Raspberry Pi), you would know that I've only set up the data-collection half of this project. The other half will involve getting the data to make hardware do stuff in the real world. This post in particular covers how to leverage a PUBSUB (publish-subscribe) Redis server and websockets to this purpose.

What is PUBSUB?

Publish Subscribe idea

The idea behind publish/subscribe is simple. There are subscribers, which are clients that listen for changes on a set of data. Then there are publishers, which are clients that edit the set of data. Whenever the data is edited by a publisher, all the subscribers that are subscribed to that publisher hear it and act on it! Redis is a data store that supports this behavior out of the box, so we'll be using it as our data store. For this light sensor project, we'll have one publisher (the light sensor) and potentially many subscribers (such as this browser page that is tracking the light levels in the Yeti office currently, a client program writing to a database, or even a smart mechanical shutter).

The High Level

We'll be running a Redis server and two Python websocket servers (one for accepting PUBLISH signals and another for accepting SUBSCRIBE signals) on a Debian VPS (i.e. an AWS EC2 instance, Digital Ocean droplet, etc.), exposing only the websocket server processes with Nginx. The websocket servers will access and edit the Redis server. I opted to use websockets rather than expose Redis directly simply because exposing Redis would potentially involve editing my iptables, which I am reluctant to do.

High level websocket/redis server architecture

Procedure

All the steps here will be done SSH'd in on the VPS.

Installing Redis Server

Redis is a server that is useful for storing key-value pairs in memory. For web devs, it tends to be used as a backend cache to speed up DB queries. By default it listens to signals on port 6379. To install this wonderful piece of software, the Redis folks have a pretty sweet set of documentation here. I especially like being able to interact with Redis directly through the redis-cli tool (which is handy for debugging). You can try it out after installation by typing redis-cli in the terminal.

Test out Redis Server

We can try out some Redis functionality ad-hoc with the redis-cli tool.

You can set values to arbitrary keys.

127.0.0.1:6379> SET test me
OK

You can get values by key

127.0.0.1:6379> GET test
"me"

Deleting key-value pairs is easy too.

127.0.0.1:6379> DEL test
(integer) 1
127.0.0.1:6379> GET test
(nil)

Now let's try some PUBSUB behavior. I tend to run multiple terminals at a time to test multiple processes like this out. With two windows SSH'd into the VPS, (and redis-cli running on both), try subscribing to a channel on one...

127.0.0.1:6379> SUBSCRIBE test_channel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "test_channel"
3) (integer) 1

... while publishing on another...

127.0.0.1:6379> PUBLISH test_channel "Hello World"
(integer) 1

... and then looking at the "subcribe" terminal to see the result.

127.0.0.1:6379> SUBSCRIBE test_channel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "test_channel"
3) (integer) 1
1) "message"
2) "test_channel"
3) "Hello World"  # <- There is the published message

So very cool.

More Setup

An approach then that makes sense would be to use the aioredis library in conjunction with the websockets library. At this point, you'll want to install Python 3.5 on this VPS, as well as set up a virtualenv for the Websocket servers. In the directory where all this code will live, you can create a requirements.txt file with the following dependencies...

aioredis==0.3.0
appdirs==1.4.0
hiredis==0.2.0
websockets==3.2

... and once you've activated your Python virtual environment, run pip install -r path/to/requirements.txt

A Consumer Websocket Server

The websockets Python library is pretty well documented and lays out basic usage with two common patterns. One type of websocket server pattern is called a consumer, in which the websocket listens to incoming traffic and then pipes in the processed data to a "consumer" co-routine, in this example a call to Redis's PUBLISH function. Here is an example of this below:

# consumer.py

import asyncio
from aioredis import create_connection, Channel
import websockets

async def publish_to_redis(msg, path):
    # Connect to Redis
    conn = await create_connection(('localhost', 6379))

    # Publish to channel "lightlevel{path}"
    await conn.execute('publish', 'lightlevel{}'.format(path), msg)


async def server(websocket, path):
    try:
        while True:
            # Receive data from "the outside world"
            message = await websocket.recv()

            # Feed this data to the PUBLISH co-routine
            await publish_to_redis(message, path) 

            await asyncio.sleep(1)

    except websockets.exceptions.ConnectionClosed:
        print('Connection Closed!')

if __name__ == '__main__':
    # Boiler-plate for the websocket server, running on localhost, port 8765
    loop = asyncio.get_event_loop()
    loop.set_debug(True)
    ws_server = websockets.serve(server, 'localhost', 8765)
    loop.run_until_complete(ws_server)
    loop.run_forever()

There's a few things to note here:

  • The server coroutine method covers what happens during the course of a single connection with a websocket client. This means if we want the websocket to stay open, the code needs to provide an infinite loop with the proper fallbacks.

  • The server coroutine takes the two arguments, the websocket argument that represents the websocket itself, and a path argument, which takes into account the path name of the websocket connection. So if I'm running a websocket at ws://example.com and a browser connects to ws://example.com/super/path, then path in this connection would evaluate to "super/path". This means we can use path to set things up like namespaced PUBSUB, so that a new server doesn't have to be created if I want to say, have multiple different light sensors with multiple different subscribers. This is exactly like having multiple chatrooms with different users in each one.

  • To run this server, activate the virtual environment that carries the dependencies in the requirements.txt file, and run this file like so.

    $ python consumer.py
    

A Producer Websocket Server

The other pattern is called a producer websocket server. This takes data from a producer coroutine and pushes it to the websocket client. In this case, the producer is the Redis SUBSCRIBE call, which produces data whenever data is published to the appropriate channel.

# producer.py

import asyncio
from aioredis import create_connection, Channel
import websockets

async def subscribe_to_redis(path):
    conn = await create_connection(('localhost', 6379))

    # Set up a subscribe channel
    channel = Channel('lightlevel{}'.format(path), is_pattern=False)
    await conn.execute_pubsub('subscribe', channel)
    return channel, conn


async def browser_server(websocket, path):
    channel, conn = await subscribe_to_redis(path)
    try:
        while True:
            # Wait until data is published to this channel
            message = await channel.get()

            # Send unicode decoded data over to the websocket client
            await websocket.send(message.decode('utf-8'))

    except websockets.exceptions.ConnectionClosed:
        # Free up channel if websocket goes down
        await conn.execute_pubsub('unsubscribe', channel)
        conn.close()

if __name__ == '__main__':
    # Runs a server process on 8767. Just do 'python producer.py'
    loop = asyncio.get_event_loop()
    loop.set_debug(True)
    ws_server = websockets.serve(browser_server, 'localhost', 8767)
    loop.run_until_complete(ws_server)
    loop.run_forever()

A quick NGINX configuration

I would create another file in /etc/nginx/conf.d with the below content, and then do service nginx restart to activate these changes. Because of port conflicts, the Nginx server will listen to 8766 and 8768 for the publisher and subscriber sockets respectively.

map $http_upgrade $connection_upgrade {
    default upgrade;
    '' close;
}

upstream publisherWebsocket {
    server 127.0.0.1:8765;
}

upstream subscriberSocket {
    server 127.0.0.1:8767;
}


server {
    listen  8766;
    location / {
        proxy_pass http://publisherWebsocket;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection $connection_upgrade;
    }
}

server {
    listen  8768;
    location / {
        proxy_pass http://subscriberSocket;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection $connection_upgrade;
    }
}

Debugging

We've written all these code snippets, and at this point are not experiencing any glaring runtime errors (hopefully). How can we tell if everything is connected up correctly? You can try out the light sensor facing part of the setup by setting the PUBLISH_SOCKET_LINK environment variable on the Pi to the VPS's IP address or DNS at port 8766 with a specific path (i.e. ws://example.com:8766/<path>), running the consumer server, and then using redis-cli to SUBSCRIBE to the channel lightsensor<path>. If you see a stream of numbers corresponding to the light levels, congratulations! You can then try starting the producer server to expose the data to potential subscribers (which will listen on ws://example.com:8768/<path>)!

What did we accomplish?

So, provided that everything works you should have a server setup that:

  • Accepts PUBLISH-ing websocket traffic at ws://your.ip.address.here.co/<channelPath>:8766
  • Accepts SUBSCRIBE-ing websocket traffic at ws://your.ip.address.here.co/<channelPath>:8768

The <channelPath> refers to an arbitrary string that when set tells Redis which "channel" to subscribe or publish to. 

Next Steps

Some ideas for firming up this server include:

  • Figuring out how to establish some form of authentication/authorization à la somewhat best practices.
  • Automating server setup with an Ansible Playbook (or at the very least using shell scripts)
  • Traffic encryption with SSL, converting the protocol from ws:// to wss://

As for the front-end side of things, it seems that all there is left to do is write some subscriber clients to start doing things with that sweet, sweet data. Happy building!

is a Software Engineer at Yeti. When he isn't baking bread, or messing with Raspberry Pis, he's dreaming about a future in which humanity is governed by benevolent robotic dictators. He went to school for pre-med and linguistics at the University of Florida and is hoping one day to make his parents proud by working on something tangentially related to either subject.

Enter your email to subscribe to our newsletter and get even more delicious content delivered straight to your inbox. No spam, we pinky swear.

Newsletter
blog comments powered by Disqus
Establishing a Websocket PUBSUB server with Redis and Asyncio https://s3-us-west-1.amazonaws.com/yeti-site-media/uploads/blog/.thumbnails/iot-blog.jpg/iot-blog-360x0.jpg
Yeti (415) 766-4198 https://s3-us-west-1.amazonaws.com/yeti-site-static/img/yeti-head-blue.png