1

i am trying to listen to log events via my node's websocket server but i can't seem to find a way to do that using the web3Py lib. the asynchronous implementation of the docs is basically a threaded loop that queries the node every amount of time. this would work if i have a single contract or even to look at but when having multiple filters to look at individually, this can be very resource consuming.

is there a way to implement the websocket log filtering with web3py, considering it is available in the web3js implementation?

2 Answers 2

5

Web3.py doesn't offer methods to subscribe automatically so we have to manually listen to the websocket and use rpc methods to send the subscription to logs :

import asyncio import json from web3 import Web3 from web3.middleware import geth_poa_middleware # only needed for PoA networks like BSC import requests from websockets import connect from eth_abi import decode_single, decode_abi adapter = requests.sessions.HTTPAdapter(pool_connections=50000, pool_maxsize=50000) # pool connections and max size are for HTTP calls only, since we are using WS they are not needed. session = requests.Session() w3 = Web3(Web3.WebsocketProvider("ws://<Provider>")) w3.middleware_onion.inject(geth_poa_middleware, layer=0) # only needed for PoA networks like BSC async def get_event(): async with connect("ws://localhost:8545") as ws: await ws.send(json.dumps({"id": 1, "method": "eth_subscribe", "params": ["logs", { "address": ['0x15c921AF5F49a42......'], "topics": [w3.keccak(text="Sync(uint112,uint112)").hex()]}]})) subscription_response = await ws.recv() print(subscription_response) while True: try: message = await asyncio.wait_for(ws.recv(), timeout=60) decoded = decode_single('(uint112,uint112)',bytearray.fromhex(json.loads(message)['params']['result']['data'][2:])) print(list(decoded)) pass except: pass if __name__ == "__main__": loop = asyncio.get_event_loop() while True: loop.run_until_complete(get_event()) 

after getting the data for the logs we decode them using eth_abi and extract the log data. this is a much better option than creating a web3 contract and waiting for the receipt to get the logs that we will have to filter based on topics.

0

You can also use web3-proxy-providers and subscribe to the event

web3-proxy-providers is a python package for connecting to HTTP and WebSocket Json-RPC using Socks and HTTP proxies

Sample Code:

import asyncio from Crypto.Hash import keccak from web3 import Web3 from python_socks import ProxyType from web3_proxy_providers import AsyncSubscriptionWebsocketWithProxyProvider async def callback(subs_id: str, json_result): print(json_result) async def main(loop: asyncio.AbstractEventLoop): provider = AsyncSubscriptionWebsocketProvider( loop=loop, endpoint_uri='wss://eth-mainnet.g.alchemy.com/v2/<YourAlchemyKey>', ) # subscribe to Deposit and Withdrawal events for WETH contract weth_contract_address = Web3.to_checksum_address('0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2') deposit_topic = "0x" + keccak.new(data=b'Deposit(address,uint256)', digest_bits=256).hexdigest() withdrawal_topic = "0x" + keccak.new(data=b'Withdrawal(address,uint256)', digest_bits=256).hexdigest() subscription_id = await provider.subscribe( [ 'logs', { "address": weth_contract_address, "topics": [deposit_topic, withdrawal_topic] } ], callback ) print(f'Subscribed with id {subscription_id}') print(f'Subscribed with id {subscription_id}') # unsubscribe after 30 seconds await asyncio.sleep(30) await provider.unsubscribe(subscription_id) if __name__ == '__main__': async_loop = asyncio.get_event_loop() async_loop.run_until_complete(main(loop=async_loop)) 

Disclaimer: I am the owner of this package https://github.com/sinarezaei/web3-proxy-providers

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.