Connect to multiple STEEM nodes in parallel to validate blocks

banner.png

<hr /> <h4>Repository <p dir="auto"><span><a href="https://github.com/python" target="_blank" rel="noreferrer noopener" title="This link will take you away from hive.blog" class="external_link">https://github.com/python <h4>What will I learn <ul> <li>Retrieve blocks via an API <li>Connect to multiple nodes <li>Thread synchronisation <li>Returning data to the main thread <li>Processing data from the worker threads <h4>Requirements <ul> <li>Python 3.7.2 <li>Urllib3 <li>Pipenv <h4>Difficulty <ul> <li>intermediate <hr /> <h3>Tutorial <h4>Preface <p dir="auto">STEEM allows for easy access to data from its Blockchain via the API that is publicly available. The downside of using one public API node is that the data can be compromised. When building an application that is dealing with important data to make financial decisions the reliability can be increased by connecting to multiple nodes and comparing the data from each node. This way one does not have to set up a full node, which can be expensive. <h4>Setup <p dir="auto">Download the files from Github and install the virtual environment <pre><code>$ cd ~/ $ git clone https://github.com/Juless89/steem_api_verification.git $ cd steem_api_verification $ pipenv install $ pipenv shell <h4>Retrieve blocks via an API <p dir="auto">For this tutorial the http library <code>urllib3 is used. However, any http library should do the job. To request a block from a steem node a <code>POST request has to be made that contains the following parameters. <pre><code>PARAMS = { "jsonrpc": "2.0", "method": "block_api.get_block", "params": {"block_num": self.block_num}, "id": 1 } <p dir="auto">Important to note are the <code>method, which sets which API function to be called, in this case <code>block_api.get_block, more API functions and information can be found in the official <a href="https://developers.steem.io/" target="_blank" rel="noreferrer noopener" title="This link will take you away from hive.blog" class="external_link">API docs, and the <code>params, which for this API function sets the block number to be requested. The <code>id is returned as is and can be used to keep track of which thread made a specific API call. <p dir="auto">The request can now be made. <code>POST sets the type of request, the url is the url to the steem node and the <code>PARAMS are passed as <code>body. The <code>PARAMS dict has to be converted to a string with <code>json.dumps() and encoded with <code>encode(). <pre><code>http = urllib3.PoolManager() url = 'https://api.steemit.com' r = http.request('POST', url, body=json.dumps(PARAMS).encode()) data = json.loads(r.data.decode()) <p dir="auto">The reply is an encoded string that needs to be decoded with <code>decode() and converted to a dict with <code>json.loads() for easy access. The reply has the following structure where result contains the block data. For this example the block data has been removed. <pre><code>{ 'jsonrpc': '2.0', 'result': {}, 'id': 1 } <p dir="auto">When requesting a block that does not exist yet result will be empty. <h4>Connect to multiple nodes <p dir="auto">This tutorial will be using the following nodes. These nodes are not the only ones and can be swapped out for different ones. <pre><code>nodes = [ 'api.steemit.com', 'api.steem.house', 'appbasetest.timcliff.com', 'rpc.steemviz.com', 'steemd.privex.io', 'rpc.usesteem.com', ] <p dir="auto">Creating a class allows for threading and creating separate instances for each node. To create a thread class, the class must inherit from <code>threading.Thread. In addition in the <code>__init__ function, <code>threading.Thread.__init__(self) has to be added. Each Node class is initialised with a <code>url and a <code>queue. Where the <code>url is the node url and the <code>queue is an object that is used to share data between threads. It includes all locking mechanisms to prevent data corruption. Data can be added by calling <code>queue.put() and retrieved by calling <code>queue.get(). <pre><code>import threading import queue class Node(threading.Thread): def __init__(self, url, queue): threading.Thread.__init__(self) self.block_num = None self.url = url self.queue = queue self.http = urllib3.PoolManager() def get_block(self): # function to retrieve block from the api def run(self): # loop function that is called when the thread is started # should manage the thread <p dir="auto">All the threads are created by looping over the node list. Each thread has a unique <code>url and share the same <code>queue. The threads are stored inside the <code>threads list. <code>.start() calls the <code>run function inside the class. <pre><code> # threads list and queue object threads = [] queue = queue.Queue() # create worker for each api, start and store for node in nodes: worker = Node(node, queue) worker.start() threads.append(worker) <p dir="auto">The node threads will be referred to as worker threads, while thread that created the worker threads will be referred to as the main thread. The processing of the data retrieved from the worker threads will be done inside the main thread. <h4>Thread synchronisation <p dir="auto">Threads are separated from each other, which makes communication between threads limited. Shared data objects like the <code>queue allow for shared data between threads. However, to prevent data corruption, in the case multiple threads write to the same data object, locking is implemented which makes threads have to wait for each other. <p dir="auto">To prevent locking a different solution will be used to update the threads on which block they have to retrieve. A global variable <code>block_num is initialised in the main file. Then from within the class this variable can be accessed by calling <code>global block_num. <pre><code>def run(self): # global current block counter global block_num # check if the global block counter has been changed # if so retrieve block via api and put into queue while True: if block_num != self.block_num: self.block_num = block_num if block_num != None: block = self.get_block() self.queue.put(block) time.sleep(.1) <p dir="auto">When the thread is initialised it sets <code>global block_num. Then a loop gets created where the <code>global block_num is compared with the local <code>block_num. When this is different the block will be retrieved and put in the queue for the main thread to process. This works as the main thread is the only one changing the <code>global block_num, the worker threads only read it. After each loop the worker thread sleeps for .1 seconds, this reduces stress on the cpu. <h4>Returning data to the main thread <p dir="auto">When data is stored inside the queue, the main thread does not know which thread stored this data. Therefor a set is returned which contains the node <code>url, the <code>block_num and the <code>block data. In case of any exceptions <code>None is returned instead of <code>block data. This guarantees a response from each worker thread. <pre><code># Perform API call to get block return None for non existing blocks and # any exceptions. def get_block(self): # API request PARAMS = { "jsonrpc": "2.0", "method": "block_api.get_block", "params": {"block_num": self.block_num}, "id": 1 } url = 'https://' + self.url try: # perform and decode request r = self.http.request('POST', url, body=json.dumps(PARAMS).encode()) data = json.loads(r.data.decode()) # Empty result for blocks that do not exist yet if len(data['result']) == 0: return (self.block_num, url, None) return (self.block_num, url, data) # retun None for any exceptions except Exception: return (self.block_num, url, None) <p dir="auto">The set in then put inside the queue <pre><code>block = self.get_block() self.queue.put(block) <h4>Processing data from the worker threads <p dir="auto">The main thread waits until the queue reaches the size of the amount of nodes. This works as every worker thread is set up to reply, even when the api call fails. <pre><code>while True: # when the queue reaches the same size as the amount of nodes # empty the queue if queue.qsize() == len(nodes): # process else: time.sleep(0.1) <p dir="auto">All replies are removed from the queue, checked to be valid, and if so stored inside storage. <pre><code>storage = [] # Check if block has been retrieved successfully store in storage for x in range(len(nodes)): block = queue.get() if block[2] != None: storage.append(block) else: print(block[1], block[0], 'Not valid') <p dir="auto">The order in which the worker threads reply is non deterministic. Meaning, that the order is random. To keep comparing all blocks simple, the blocks are compared against the first block received. In the case the first block is corrupted the block will be retried. When blocks are the same, the score get increased by 1. <pre><code># Extract and compare block data data = [x[2] for x in storage] score = 0 # Check block data against the first block that has been # received. As this is non deterministic, in case of a corrupted # block, the order should be different when a retry is needed. for x in data[1:]: if x==data[0]: score += 1 <p dir="auto">When all blocks are compared the final score is calculated. For this example a threshold of .75 is selected, meaning that if 75% of the received blocks are the same the main thread will go on to the next block. <pre><code># Calculate how many blocks are the same final_score = (score+1)/len(nodes) print(f'Block: {block_num} Score: {final_score}') # When the score is higher than a certain treshhold go on to the # next block. If not reset the global block num. Wait for the workers # to update and retry. if final_score > 0.75: block_num += 1 else: temp = block_num block_num = None time.sleep(0.5) block_num = temp <p dir="auto">Since the worker threads check for a difference in the <code>global block_num, when the same block has to be retried this would fail the check. Therefor, the <code>block_num is first set to None, then the main thread waits for the workers to update and then sets the <code>block_num to the old value. <h4>Running the code <p dir="auto">The main file takes in one argument: block number <pre><code>python main.py 25000000 <p dir="auto"><img src="https://images.hive.blog/768x0/https://files.steempeak.com/file/steempeak/steempytutorials/Rpah37ji-Screenshot202019-03-212015.11.51.png" alt="Screenshot 20190321 15.11.51.png" srcset="https://images.hive.blog/768x0/https://files.steempeak.com/file/steempeak/steempytutorials/Rpah37ji-Screenshot202019-03-212015.11.51.png 1x, https://images.hive.blog/1536x0/https://files.steempeak.com/file/steempeak/steempytutorials/Rpah37ji-Screenshot202019-03-212015.11.51.png 2x" /> <p dir="auto">A not valid block does not mean the node has corrupted the block, it can also mean the node failed to deliver the block. As no difference has been made in the code. <hr /> <p dir="auto">The code for this tutorial can be found on <a href="https://github.com/Juless89/steem_api_verification.git" target="_blank" rel="noreferrer noopener" title="This link will take you away from hive.blog" class="external_link">Github! <p dir="auto"><span>This tutorial was written by <a href="/@juliank">@juliank.
Sort:  


After analyzing your tutorial we suggest the following:Thank you for your contribution @steempytutorials.

  • Your tutorial is very interesting and well explained. However, we suggest that in your next tutorial put more screenshots in the course of the contribution.

  • Thanks for following some suggestions that we indicated in your previous tutorial. This contribution was made more detailed and complete.

Thank you for your work in developing this tutorial.
Looking forward to your upcoming tutorials.

Your contribution has been evaluated according to Utopian policies and guidelines, as well as a predefined set of questions pertaining to the category.

To view those questions and the relevant answers related to your post, click here.


Need help? Chat with us on Discord.

[utopian-moderator]

Thanks for the feedback as always

Thank you for your review, @portugalcoin! Keep up the good work!

Hi @steempytutorials!



Feel free to join our @steem-ua Discord serverYour post was upvoted by @steem-ua, new Steem dApp, using UserAuthority for algorithmic post curation! Your post is eligible for our upvote, thanks to our collaboration with @utopian-io!

Hey, @steempytutorials!

Thanks for contributing on Utopian.
We’re already looking forward to your next contribution!

Get higher incentives and support Utopian.io!
SteemPlus or Steeditor). Simply set @utopian.pay as a 5% (or higher) payout beneficiary on your contribution post (via

Want to chat? Join us on Discord https://discord.gg/h52nFrV.

Vote for Utopian Witness!