Skip to main content
added 179 characters in body
Source Link

Web3.py doesn't offer methods to subscribe automatically so we have to manually listen to the websocket and use rpc methods to send the subscription to logs :

import asyncio import json from web3 import Web3 from web3.middleware import geth_poa_middleware # only needed for PoA networks like BSC import requests from websockets import connect from eth_abi import decode_single, decode_abi adapter = requests.sessions.HTTPAdapter(pool_connections=50000, pool_maxsize=50000) # pool connections and max size are for HTTP calls only, since we are using WS they are not needed.  session = requests.Session() w3 = Web3(Web3.WebsocketProvider("ws://<Provider>")) w3.middleware_onion.inject(geth_poa_middleware, layer=0)   # only needed for PoA networks like BSC async def get_event(): async with connect("ws://localhost:8545") as ws: await ws.send(json.dumps({"id": 1, "method": "eth_subscribe", "params": ["logs", { "address": ['0x15c921AF5F49a42......'], "topics": [w3.keccak(text="Sync(uint112,uint112)").hex()]}]})) subscription_response = await ws.recv() print(subscription_response) while True: try: message = await asyncio.wait_for(ws.recv(), timeout=60) decoded = decode_single('(uint112,uint112)',bytearray.fromhex(json.loads(message)['params']['result']['data'][2:])) print(list(decoded)) pass except: pass if __name__ == "__main__": loop = asyncio.get_event_loop() while True: loop.run_until_complete(get_event()) 

after getting the data for the logs we decode them using eth_abi and extract the log data. this is a much better option than creating a web3 contract and waiting for the receipt to get the logs that we will have to filter based on topics.

Web3.py doesn't offer methods to subscribe automatically so we have to manually listen to the websocket and use rpc methods to send the subscription to logs :

import asyncio import json from web3 import Web3 from web3.middleware import geth_poa_middleware import requests from websockets import connect from eth_abi import decode_single, decode_abi adapter = requests.sessions.HTTPAdapter(pool_connections=50000, pool_maxsize=50000) session = requests.Session() w3 = Web3(Web3.WebsocketProvider("ws://<Provider>")) w3.middleware_onion.inject(geth_poa_middleware, layer=0)   async def get_event(): async with connect("ws://localhost:8545") as ws: await ws.send(json.dumps({"id": 1, "method": "eth_subscribe", "params": ["logs", { "address": ['0x15c921AF5F49a42......'], "topics": [w3.keccak(text="Sync(uint112,uint112)").hex()]}]})) subscription_response = await ws.recv() print(subscription_response) while True: try: message = await asyncio.wait_for(ws.recv(), timeout=60) decoded = decode_single('(uint112,uint112)',bytearray.fromhex(json.loads(message)['params']['result']['data'][2:])) print(list(decoded)) pass except: pass if __name__ == "__main__": loop = asyncio.get_event_loop() while True: loop.run_until_complete(get_event()) 

after getting the data for the logs we decode them using eth_abi and extract the log data. this is a much better option than creating a web3 contract and waiting for the receipt to get the logs that we will have to filter based on topics.

Web3.py doesn't offer methods to subscribe automatically so we have to manually listen to the websocket and use rpc methods to send the subscription to logs :

import asyncio import json from web3 import Web3 from web3.middleware import geth_poa_middleware # only needed for PoA networks like BSC import requests from websockets import connect from eth_abi import decode_single, decode_abi adapter = requests.sessions.HTTPAdapter(pool_connections=50000, pool_maxsize=50000) # pool connections and max size are for HTTP calls only, since we are using WS they are not needed.  session = requests.Session() w3 = Web3(Web3.WebsocketProvider("ws://<Provider>")) w3.middleware_onion.inject(geth_poa_middleware, layer=0) # only needed for PoA networks like BSC async def get_event(): async with connect("ws://localhost:8545") as ws: await ws.send(json.dumps({"id": 1, "method": "eth_subscribe", "params": ["logs", { "address": ['0x15c921AF5F49a42......'], "topics": [w3.keccak(text="Sync(uint112,uint112)").hex()]}]})) subscription_response = await ws.recv() print(subscription_response) while True: try: message = await asyncio.wait_for(ws.recv(), timeout=60) decoded = decode_single('(uint112,uint112)',bytearray.fromhex(json.loads(message)['params']['result']['data'][2:])) print(list(decoded)) pass except: pass if __name__ == "__main__": loop = asyncio.get_event_loop() while True: loop.run_until_complete(get_event()) 

after getting the data for the logs we decode them using eth_abi and extract the log data. this is a much better option than creating a web3 contract and waiting for the receipt to get the logs that we will have to filter based on topics.

deleted 6 characters in body
Source Link

Web3.py doesn't offer methods to subscribe automatically so we have to manually listen to the websocket and use rpc methods to send the subscription to logs :

import asyncio import json from web3 import Web3 from web3.middleware import geth_poa_middleware import requests from websockets import connect from eth_abi import decode_single, decode_abi adapter = requests.sessions.HTTPAdapter(pool_connections=50000, pool_maxsize=50000) session = requests.Session() w3 = Web3(Web3.WebsocketProvider("ws://144.91.76.9:8547"<Provider>")) w3.middleware_onion.inject(geth_poa_middleware, layer=0) async def get_event(): async with connect("ws://localhost:8545") as ws: await ws.send(json.dumps({"id": 1, "method": "eth_subscribe", "params": ["logs", { "address": ['0x15c921AF5F49a42......'], "topics": [w3.keccak(text="Sync(uint112,uint112)").hex()]}]})) subscription_response = await ws.recv() print(subscription_response) while True: try: message = await asyncio.wait_for(ws.recv(), timeout=60) decoded = decode_single('(uint112,uint112)',bytearray.fromhex(json.loads(message)['params']['result']['data'][2:])) print(list(decoded)) pass except: pass if __name__ == "__main__": loop = asyncio.get_event_loop() while True: loop.run_until_complete(get_event()) 

after getting the data for the logs we decode them using eth_abi and extract the log data. this is a much better option than creating a web3 contract and waiting for the receipt to get the logs that we will have to filter based on topics.

Web3.py doesn't offer methods to subscribe automatically so we have to manually listen to the websocket and use rpc methods to send the subscription to logs :

import asyncio import json from web3 import Web3 from web3.middleware import geth_poa_middleware import requests from websockets import connect from eth_abi import decode_single, decode_abi adapter = requests.sessions.HTTPAdapter(pool_connections=50000, pool_maxsize=50000) session = requests.Session() w3 = Web3(Web3.WebsocketProvider("ws://144.91.76.9:8547")) w3.middleware_onion.inject(geth_poa_middleware, layer=0) async def get_event(): async with connect("ws://localhost:8545") as ws: await ws.send(json.dumps({"id": 1, "method": "eth_subscribe", "params": ["logs", { "address": ['0x15c921AF5F49a42......'], "topics": [w3.keccak(text="Sync(uint112,uint112)").hex()]}]})) subscription_response = await ws.recv() print(subscription_response) while True: try: message = await asyncio.wait_for(ws.recv(), timeout=60) decoded = decode_single('(uint112,uint112)',bytearray.fromhex(json.loads(message)['params']['result']['data'][2:])) print(list(decoded)) pass except: pass if __name__ == "__main__": loop = asyncio.get_event_loop() while True: loop.run_until_complete(get_event()) 

after getting the data for the logs we decode them using eth_abi and extract the log data. this is a much better option than creating a web3 contract and waiting for the receipt to get the logs that we will have to filter based on topics.

Web3.py doesn't offer methods to subscribe automatically so we have to manually listen to the websocket and use rpc methods to send the subscription to logs :

import asyncio import json from web3 import Web3 from web3.middleware import geth_poa_middleware import requests from websockets import connect from eth_abi import decode_single, decode_abi adapter = requests.sessions.HTTPAdapter(pool_connections=50000, pool_maxsize=50000) session = requests.Session() w3 = Web3(Web3.WebsocketProvider("ws://<Provider>")) w3.middleware_onion.inject(geth_poa_middleware, layer=0) async def get_event(): async with connect("ws://localhost:8545") as ws: await ws.send(json.dumps({"id": 1, "method": "eth_subscribe", "params": ["logs", { "address": ['0x15c921AF5F49a42......'], "topics": [w3.keccak(text="Sync(uint112,uint112)").hex()]}]})) subscription_response = await ws.recv() print(subscription_response) while True: try: message = await asyncio.wait_for(ws.recv(), timeout=60) decoded = decode_single('(uint112,uint112)',bytearray.fromhex(json.loads(message)['params']['result']['data'][2:])) print(list(decoded)) pass except: pass if __name__ == "__main__": loop = asyncio.get_event_loop() while True: loop.run_until_complete(get_event()) 

after getting the data for the logs we decode them using eth_abi and extract the log data. this is a much better option than creating a web3 contract and waiting for the receipt to get the logs that we will have to filter based on topics.

Source Link

Web3.py doesn't offer methods to subscribe automatically so we have to manually listen to the websocket and use rpc methods to send the subscription to logs :

import asyncio import json from web3 import Web3 from web3.middleware import geth_poa_middleware import requests from websockets import connect from eth_abi import decode_single, decode_abi adapter = requests.sessions.HTTPAdapter(pool_connections=50000, pool_maxsize=50000) session = requests.Session() w3 = Web3(Web3.WebsocketProvider("ws://144.91.76.9:8547")) w3.middleware_onion.inject(geth_poa_middleware, layer=0) async def get_event(): async with connect("ws://localhost:8545") as ws: await ws.send(json.dumps({"id": 1, "method": "eth_subscribe", "params": ["logs", { "address": ['0x15c921AF5F49a42......'], "topics": [w3.keccak(text="Sync(uint112,uint112)").hex()]}]})) subscription_response = await ws.recv() print(subscription_response) while True: try: message = await asyncio.wait_for(ws.recv(), timeout=60) decoded = decode_single('(uint112,uint112)',bytearray.fromhex(json.loads(message)['params']['result']['data'][2:])) print(list(decoded)) pass except: pass if __name__ == "__main__": loop = asyncio.get_event_loop() while True: loop.run_until_complete(get_event()) 

after getting the data for the logs we decode them using eth_abi and extract the log data. this is a much better option than creating a web3 contract and waiting for the receipt to get the logs that we will have to filter based on topics.