I'm creating an automation script for Google Workspace where it fetches an direct children of some organization unit and then fetches children of all these OUs at the same time. While searching the web for the answer whether multiprocessing, threading or asynchronous processing will work best for me I understood that asyncio is going to help me with this issue. I have created a class Google Tenant which holds the connection to google api and the fetched users. However, my problem right now is that the script still is not asynchronous but it works in sequence rather than making the calls asynchronously
from google.oauth2 import service_account from googleapiclient.discovery import build import logging import asyncio class GoogleTenant: def __init__(self, api: str, version: str): config: ScriptConfig = ScriptConfig() credentials = service_account.Credentials.from_service_account_file(config["gcloud"]["keypath"], scopes=SCOPES) delegated_credentials = credentials.with_subject(config["gcloud"]["subject"]) self.service = build(api, version, credentials=delegated_credentials) self.users_list = [] def fetch_users(self): users_ous = self.service.orgunits().list(customerId="my_customer", orgUnitPath="/example", type="children", fields="organizationUnits/orgUnitPath").execute() ous_list = [ou["orgUnitPath"] for ou in users_ous["organizationUnits"]] asyncio.run(self._crawl_ous(ous_list)) async def _crawl_ous(self, ous: list): crawling_result = await asyncio.gather(*[asyncio.create_task(self._fetch_users_from_ou(ou)) for ou in ous]) for result in crawling_result: logging.info(f"Crawling result of ou {result[0]["organizations"][0]["department"]}: {len(result)}") self.users_list.extend(result) async def _fetch_users_from_ou(self, ou): call_parameters = { "customer": "my_customer", "maxResults": 500, "projection": "basic", "query": f"orgUnitPath='{str(ou)}'", "fields": "users/id,users/name,users/primaryEmail,users/suspended,users/emails,users/organizations/primary,users/organizations/department,users/recoveryEmail,nextPageToken" } logger.debug(f"Fetching users from {ou}") users_from_ou = self.service.users().list(**call_parameters).execute() user_fetching_result: list = users_from_ou["users"] logger.debug(f"Initial fetch from {ou}: {len(users_from_ou["users"])}") if "nextPageToken" in users_from_ou: next_page_token = users_from_ou["nextPageToken"] else: return user_fetching_result while True: users_from_ou = self.service.users().list(**call_parameters, pageToken=next_page_token).execute() logger.debug(f"Next fetch from {ou}: {len(users_from_ou["users"])}") user_fetching_result.extend(users_from_ou["users"]) if "nextPageToken" in users_from_ou: next_page_token = users_from_ou["nextPageToken"] else: return user_fetching_result if __name__ == '__main__': google_tenant = GoogleTenant("admin", "directory_v1") google_tenant.fetch_users() The execution result of the following:
DEBUG:root:Fetching users from /example/child1 DEBUG:root:Initial fetch from /example/child1: 500 DEBUG:root:Next fetch from /example/child1: 500 DEBUG:root:Next fetch from /example/child1: 500 DEBUG:root:Next fetch from /example/child1: 258 DEBUG:root:Fetching users from /example/child2 DEBUG:root:Initial fetch from /example/child2: 500 DEBUG:root:Next fetch from /example/child2: 500 DEBUG:root:Next fetch from /example/child2: 500 ... I've tried to enter the await statement in places however I seem to misunderstand how it should as per my understanding the await statement makes the function wait for the result before continuing function execution. How can I make python execute these concurrently?
Update 1
I reformatted parts of the code as per @Michael Butscher suggestion and also added my imports in the previous block
def fetch_users(self): users_ous = self.service.orgunits().list(customerId="my_customer", orgUnitPath="/example", type="children", fields="organizationUnits/orgUnitPath").execute() ous_list = [ou["orgUnitPath"] for ou in users_ous["organizationUnits"]] logger.debug(f"Fetched and sanitized ous: {pprint.pformat(ous_list)}") asyncio.run(self._crawl_ous(ous_list)) async def _crawl_ous(self, ous: list): tasks = [self._crawler_proxy(ou) for ou in ous] crawling_result = await asyncio.gather(*tasks) for result in crawling_result: logger.info(f"Crawling result: {len(result)}") self.users_list.extend(result) async def _crawler_proxy(self, *args, **kwargs): loop = asyncio.get_event_loop() return await loop.run_in_executor(None, self._fetch_users_from_ou(*args, **kwargs))
asyncioand rather usethreadingto execute these function calls using multiple threads?requests-async) then yes.threadingdirectly you may use anExecutorfromconcurrent.futuresormultiprocessing.pool.ThreadPool.google-api-python-clientwhich I forgot to include in the post, for which I'm sorry. However based on your suggestion and as per this link I modified my code yet I see no effect on the execution .