Part 3: Connecting and managing multiple websocket streams in parallel

in #utopian-io5 years ago (edited)

banner.png


Repository

https://github.com/python

What will I learn

  • Creating websocket threads
  • Managing the websocket threads
  • Managing multiple orderbooks
  • Locking
  • Accessing data from the main thread
  • Ping/pong

Requirements

  • Python 3.7.2
  • Pipenv
  • websocket-client
  • requests

Difficulty

  • intermediate

Tutorial

Preface

Websockets allow for real time updates while putting less stress on the servers than API calls would. They are especially useful when data is updated frequently, like trades and the orderbooks on crypto currency exchanges. This tutorial will look at how to connect to multiple websockets in parallel. It's a direct continuation on the previous two parts, if anything is unclear refer to the previous tutorials.

Setup

Download the files from Github and install the virtual environment

$ cd ~/
$ git clone https://github.com/Juless89/tutorials-websockets
$ cd tutorials-websockets
$ pipenv install
$ pipenv shell
$ cd part_3

Creating websocket threads

To allow for multiple websocket streams to be active in 'parallel' the threads library is used. Keep in mind that in Python threads are not real threads. That is, they do not execute at the exact same moment. However, since websockets are mostly dealing with network I/O this is no problem. Real threads can be achieved by using processes.

In order to keep the code clean and dynamic the websocket has been split up into multiple classes. The Client class contains the basic fundamentals in order to connect to a websocket stream, this class inherits from Thread, which is in essence just a class by itself. Then two more classes Huobi and Binance are created which represent the Exchanges. Inside these classes the custom code is put for dealing with those particular exchanges. These classes inherit from Client.

This creates the structure:


Blank Diagram.png

# huobi.py

# inherits from Client
class Huobi(Client):
    # call init from parent class
    def __init__(self, url, exchange):
        super().__init__(url, exchange)

    # convert message to dict, decode, extract top ask/bid
    def on_message(self, message):
        data = loads(gzip.decompress(message).decode('utf-8'))
        
        # extract bids/aks
        if 'tick' in data:
            bid = data['tick']['bids'][0]
            ask = data['tick']['asks'][0]

            print(f'Updated {self.exchange}')
        # respond to ping message
        elif 'ping' in data:
            params = {"pong": "data['ping']"}
            self.ws.send(dumps(params))

    # convert dict to string, subscribe to data streem by sending message
    def on_open(self):
        super().on_open()
        params = {"sub": "market.steembtc.depth.step0", "id": "id1"}
        self.ws.send(dumps(params))

Inheritance is achieved by calling the Class name between () in the class constructor, like: class Huobi(Client). This copies all the code from the parent class. Function and variables can be overwritten by rewriting the functions/variables in the child class. super().<function_name>allow for the original function to be called. Code can be added before or after.

In this example super().__init__(url, exchange) is called in the __init__ function to pass the variables url and exchange to the Client class. on_message and on_open are rewritten, therefor replacing the original functions with the same name from the Client class.

The same is done inside Client, which inherits form Thread.

class Client(threading.Thread):
    def __init__(self, url, exchange):
        super().__init__()

Managing the websocket threads

As Threads are just classes they are created like any other class. However, they require one function the manages the thread. Usually this is start() or run(). In this case run has been set inside Client.

# client.py

# keep connection alive
def run(self):
    while True:
        self.ws.run_forever()

run() is called when calling .start() on the thread class.

# main.py

from binance import Binance
from huobi import Huobi


if __name__ == "__main__":
    # create websocket threads
    binance = Binance("wss://stream.binance.com:9443/ws/steembtc@depth", "Binance")
    huobi = Huobi("wss://api.huobipro.com/ws", "Huobi")

    # start threads
    binance.start()
    huobi.start()


websockets_part3.gif

Managing multiple orderbooks

At the moment there are three active threads. The two exchange websockets and the main thread that created the other two threads. The main thread can be used to process the data from the websockets. For this a shared data structure is needed, as well as a locking mechanism to prevent data corruption.

# data management
lock = threading.Lock()
orderbooks = {
    "Binance": {},
    "Huobi": {},
}

# create websocket threads
binance = Binance(
    url="wss://stream.binance.com:9443/ws/steembtc@depth",
    exchange="Binance",
    orderbook=orderbooks['Binance'],
    lock=lock,
)

huobi = Huobi(
    url="wss://api.huobipro.com/ws",
    exchange="Huobi",
    orderbook=orderbooks['Huobi'],
    lock=lock,
)

Locking

A Lock has two important functions acquire() and release(). By default these are set to blocking, which means that a thread will block until it can acquire a lock. If the lock is already taken it will wait for the lock to be freed. If something goes wrong while the lock is acquired a program can halt if the lock is not released when using blocking statements.

This is prevented with the following code:

lock.acquire()

try:
    # execute code
finally:
    lock.release()

This entire code block can be placed with:

with lock:
    # execute code

This does exactly the same thing and looks as follows in the code:

# binance.py

# Loop through all bid and ask updates, call manage_orderbook accordingly
def process_updates(self, data):
    with self.lock:
        for update in data['b']:
            self.manage_orderbook('bids', update)
        for update in data['a']:
            self.manage_orderbook('asks', update)

Accessing data from the main thread

The same orderbooks dict is also accessible from the main thread and is accessed in the same way. To prevent the thread from keeping the orderbooks locked it sleeps 1 second on every pass. The try/except block is for when the orderbook has not been filled with data yet.

# main.py

# print top bid/ask for each exchange
# run forever
def run(orderbooks, lock):
    while True:
        try:
            with lock:
                # extract and print data
                for key, value in orderbooks.items():
                    bid = value['bids'][0][0]
                    ask = value['asks'][0][0]
                    print(f"{key} bid: {bid} ask: {ask}")
                print()
            time.sleep(1)
        except Exception:
            pass

websockets_part3_w.gif

A different approach would be keep track of when the orderbooks were last updated and only accessing the data if that is the case. This can be achieved by adding a last_update variable to the shared dict and then comparing this to a local last_update in the main thread.

orderbooks = {
    "Binance": {},
    "Huobi": {},
    "last_update": None,  # new
}

def run(orderbooks, lock):
    # local last_update
    current_time = datetime.now()
    while True:
        try:
            # check for new update
            if orderbooks['last_update'] != current_time:
                with lock:
                    # do stuff
   
                    # set local last_update to last_update
                    current_time = orderbooks['last_update']
            time.sleep(0.1)
        except Exception:
            pass

This way only when the there is a new update the main thread will lock the orderbook and process the data. In addition the sleep time has been decreased to 0.1, making processing almost instant. Also, in both the Huobi and Binance classes changes have been made to update the last_update variable after each update.

# init
self.orderbook = orderbook[exchange]
self.last_update = orderbook


# after update
self.last_update['last_update'] = datetime.now()

Ping/pong (amendment to tutorial 1)

In tutorial 1 there was a part about ping messages. After some testing it appears these messages have to be replied to to keep the connection alive.

# huobi.py

# respond to ping message
elif 'ping' in data:
    params = {"pong": "data['ping']"}
    self.ws.send(dumps(params))

Running the code

python main.py


websockets_part3_3.gif

Curriculum


The code for this tutorial can be found on Github!

This tutorial was written by @juliank.

Sort:  

Thank you for your contribution @steempytutorials.

  • Excellent work in the elaboration of this tutorial!

  • Your tutorials are very interesting to show the results in GIFs.

Thank you for your work in developing this tutorial.
Looking forward to your upcoming tutorials.

Your contribution has been evaluated according to Utopian policies and guidelines, as well as a predefined set of questions pertaining to the category.

To view those questions and the relevant answers related to your post, click here.


Need help? Chat with us on Discord.

[utopian-moderator]

Thank you for your review, @portugalcoin! Keep up the good work!

Hi @steempytutorials!

Your post was upvoted by @steem-ua, new Steem dApp, using UserAuthority for algorithmic post curation!
Your post is eligible for our upvote, thanks to our collaboration with @utopian-io!
Feel free to join our @steem-ua Discord server

Congratulations @steempytutorials! You have completed the following achievement on the Steem blockchain and have been rewarded with new badge(s) :

You received more than 4000 upvotes. Your next target is to reach 5000 upvotes.

You can view your badges on your Steem Board and compare to others on the Steem Ranking
If you no longer want to receive notifications, reply to this comment with the word STOP

Vote for @Steemitboard as a witness to get one more award and increased upvotes!

Hey, @steempytutorials!

Thanks for contributing on Utopian.
We’re already looking forward to your next contribution!

Get higher incentives and support Utopian.io!
Simply set @utopian.pay as a 5% (or higher) payout beneficiary on your contribution post (via SteemPlus or Steeditor).

Want to chat? Join us on Discord https://discord.gg/h52nFrV.

Vote for Utopian Witness!

Hi, @steempytutorials!

You just got a 0.09% upvote from SteemPlus!
To get higher upvotes, earn more SteemPlus Points (SPP). On your Steemit wallet, check your SPP balance and click on "How to earn SPP?" to find out all the ways to earn.
If you're not using SteemPlus yet, please check our last posts in here to see the many ways in which SteemPlus can improve your Steem experience on Steemit and Busy.