0

I have a class that create a url and some json to execute in a post method that looks like that and I was following this guide

import vk_api from vk_api.execute import VkFunction import time from datetime import datetime import numpy as np import asyncio from ratelimit import limits import requests import aiohttp class Execute: def __init__(self, access_token): self.access_token = access_token def posts_to_push(self, posts, limit): arr = [] data = list(self.posts_chunks_limit(posts, limit)) for i in range(len(data)): code = f"data.push(API.wall.getById( {{'posts': {data[i]} }} )); " arr.append(code) return arr def posts_execute_command(self, posts): # TODO make async limit = 100 code = self.posts_to_push(posts, limit) execute_limit = 25 for i in range(len(code)): data = ''.join(code[i * execute_limit: (i * execute_limit) + execute_limit]) var = f'var data = []; {data} return data ;' yield var async def fetch(url, json_data, session): async with session.post(url, json=json_data) as response: return await response.read() async def result_posts(self, posts): result = [] command = self.posts_execute_command(posts) async with aiohttp.ClientSession() as session: for i in command: execute = asyncio.ensure_future(self.fetch(url="https://api.vk.com/method/execute", json_data={ "code": i, "access_token": self.access_token, "v": 5.101, }), session) result.append(execute) responses = await asyncio.gather(*result) print(responses) async def posts_chunks_limit(self, data, limit): """Yield successive n-sized chunks from l.""" for i in range(0, len(data), limit): await asyncio.sleep(0.1) yield data[i:i + limit] def run_async(self, posts): loop = asyncio.get_event_loop() loop.run_until_complete(self.result_posts(posts)) 

and then i run it like this

df = pd.read_csv('/some_path') arr = [] for i in df['ids']: arr.append(i) loop = asyncio.get_event_loop() future = asyncio.ensure_future(vk.result_posts(arr)) loop.run_until_complete(future) 

error message looks like this

Traceback (most recent call last): File "../test_python.py", line 83, in <module> loop.run_until_complete(future) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete return future.result() File "../test_python.py", line 45, in result_posts for i in command: File "../test_python.py", line 29, in posts_execute_command code = self.posts_to_push(posts, limit) File "../test_python.py", line 21, in posts_to_push data = list(self.posts_chunks_limit(posts, limit)) TypeError: 'async_generator' object is not iterable 

This is my frist time using aiohttp/asyncio, I find it quite complicated and easy to get lost, may be I can get some directions or solutions in my case ?

2 Answers 2

4

In this line:

 data = list(self.posts_chunks_limit(posts, limit)) 

As post_chunks_limit is an async iterator, list doesn't know what to do with it. You need to iterate over it with async for or with an async list comprehension:

 data = [x async for x in self.posts_chunks_limit(posts, limit)] 

This requires, posts_to_push and posts_execute_command to be defined with async def. Also posts_execute_command must await the call to posts_to_push and result_posts needs to await the call to posts_execute_command.

Sign up to request clarification or add additional context in comments.

5 Comments

I have python 3.6 if that makes a difference, since in library that looks like does make a difference
@nexla The problem is that posts_to_push wasn't async def. I've now amended the answer with additional information.
I get an error TypeError: object async_generator can't be used in 'await' expression
However, i am quite certain that my issue is with the posts_execute_command method, because if I print or return value instead of yield it works, but obviously not correct :( Sorry it might be ridiculous , but im just grasping the concept of asyncronous programming in python and asyncio library :/
I have tried using @asyncio.coroutine decorator on methods i am using yield, i get TypeError: object of type 'coroutine' has no len()
1

With the help of @user4815162342 and bunch of SO posts, I was able to fix my issue and my code looks like this. Issue was I was calling/awaiting a generator which would not be iterable in my result_postsmethod.

import vk_api from vk_api.execute import VkFunction import time from datetime import datetime import numpy as np import asyncio from ratelimit import limits import requests import aiohttp import socket from concurrent.futures import ThreadPoolExecutor class Execute: # TODO auth, parsers, limits, timeouts def __init__(self, access_token): self.access_token = access_token async def posts_to_push(self, posts, limit): arr = [] data = [x async for x in self.posts_chunks_limit(posts, limit)] for i in range(len(data)): code = f"data.push(API.wall.getById( {{'posts': {data[i]} }} )); " arr.append(code) return arr # < len() = 1000, 1k lists with 100 post IDs inside for 100k total ids async def posts_execute_command(self, posts): # TODO make async limit = 100 code = await self.posts_to_push(posts, limit) execute_limit = 25 for i in range(len(code)): data = ''.join(code[i * execute_limit: (i * execute_limit) + execute_limit]) var = f'var data = []; {data} return data ;' print(var, '---var---') yield var async def fetch(self, url, json_data, session): async with session.post(url, data=json_data) as response: return await response.read() @limits(calls=1, period=1) async def result_posts(self, posts): result = [] command = [i async for i in self.posts_execute_command(posts) ] #<note this iteration conn = aiohttp.TCPConnector( family=socket.AF_INET, verify_ssl=False,) async with aiohttp.ClientSession(connector=conn) as session: for i in command: print('---code---', len(command)) #TODO fix command range that's the bug execute = asyncio.ensure_future(self.fetch(url="https://api.vk.com/method/execute", json_data={ "code": i, "access_token": self.access_token, "v": 5.101, }, session = session)) await asyncio.sleep(1) result.append(execute) responses = await asyncio.gather(*result) print(responses, 'responses') return 'Done' async def posts_chunks_limit(self, data, limit): """Yield successive n-sized chunks from l.""" for i in range(0, len(data), limit): yield data[i:i + limit] 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.