Web3.py doesn't offer methods to subscribe automatically so we have to manually listen to the websocket and use rpc methods to send the subscription to logs :
import asyncio import json from web3 import Web3 from web3.middleware import geth_poa_middleware # only needed for PoA networks like BSC import requests from websockets import connect from eth_abi import decode_single, decode_abi adapter = requests.sessions.HTTPAdapter(pool_connections=50000, pool_maxsize=50000) # pool connections and max size are for HTTP calls only, since we are using WS they are not needed. session = requests.Session() w3 = Web3(Web3.WebsocketProvider("ws://<Provider>")) w3.middleware_onion.inject(geth_poa_middleware, layer=0) # only needed for PoA networks like BSC async def get_event(): async with connect("ws://localhost:8545") as ws: await ws.send(json.dumps({"id": 1, "method": "eth_subscribe", "params": ["logs", { "address": ['0x15c921AF5F49a42......'], "topics": [w3.keccak(text="Sync(uint112,uint112)").hex()]}]})) subscription_response = await ws.recv() print(subscription_response) while True: try: message = await asyncio.wait_for(ws.recv(), timeout=60) decoded = decode_single('(uint112,uint112)',bytearray.fromhex(json.loads(message)['params']['result']['data'][2:])) print(list(decoded)) pass except: pass if __name__ == "__main__": loop = asyncio.get_event_loop() while True: loop.run_until_complete(get_event())
after getting the data for the logs we decode them using eth_abi and extract the log data. this is a much better option than creating a web3 contract and waiting for the receipt to get the logs that we will have to filter based on topics.