0

I'm creating an automation script for Google Workspace where it fetches an direct children of some organization unit and then fetches children of all these OUs at the same time. While searching the web for the answer whether multiprocessing, threading or asynchronous processing will work best for me I understood that asyncio is going to help me with this issue. I have created a class Google Tenant which holds the connection to google api and the fetched users. However, my problem right now is that the script still is not asynchronous but it works in sequence rather than making the calls asynchronously

from google.oauth2 import service_account from googleapiclient.discovery import build import logging import asyncio class GoogleTenant: def __init__(self, api: str, version: str): config: ScriptConfig = ScriptConfig() credentials = service_account.Credentials.from_service_account_file(config["gcloud"]["keypath"], scopes=SCOPES) delegated_credentials = credentials.with_subject(config["gcloud"]["subject"]) self.service = build(api, version, credentials=delegated_credentials) self.users_list = [] def fetch_users(self): users_ous = self.service.orgunits().list(customerId="my_customer", orgUnitPath="/example", type="children", fields="organizationUnits/orgUnitPath").execute() ous_list = [ou["orgUnitPath"] for ou in users_ous["organizationUnits"]] asyncio.run(self._crawl_ous(ous_list)) async def _crawl_ous(self, ous: list): crawling_result = await asyncio.gather(*[asyncio.create_task(self._fetch_users_from_ou(ou)) for ou in ous]) for result in crawling_result: logging.info(f"Crawling result of ou {result[0]["organizations"][0]["department"]}: {len(result)}") self.users_list.extend(result) async def _fetch_users_from_ou(self, ou): call_parameters = { "customer": "my_customer", "maxResults": 500, "projection": "basic", "query": f"orgUnitPath='{str(ou)}'", "fields": "users/id,users/name,users/primaryEmail,users/suspended,users/emails,users/organizations/primary,users/organizations/department,users/recoveryEmail,nextPageToken" } logger.debug(f"Fetching users from {ou}") users_from_ou = self.service.users().list(**call_parameters).execute() user_fetching_result: list = users_from_ou["users"] logger.debug(f"Initial fetch from {ou}: {len(users_from_ou["users"])}") if "nextPageToken" in users_from_ou: next_page_token = users_from_ou["nextPageToken"] else: return user_fetching_result while True: users_from_ou = self.service.users().list(**call_parameters, pageToken=next_page_token).execute() logger.debug(f"Next fetch from {ou}: {len(users_from_ou["users"])}") user_fetching_result.extend(users_from_ou["users"]) if "nextPageToken" in users_from_ou: next_page_token = users_from_ou["nextPageToken"] else: return user_fetching_result if __name__ == '__main__': google_tenant = GoogleTenant("admin", "directory_v1") google_tenant.fetch_users() 

The execution result of the following:

DEBUG:root:Fetching users from /example/child1 DEBUG:root:Initial fetch from /example/child1: 500 DEBUG:root:Next fetch from /example/child1: 500 DEBUG:root:Next fetch from /example/child1: 500 DEBUG:root:Next fetch from /example/child1: 258 DEBUG:root:Fetching users from /example/child2 DEBUG:root:Initial fetch from /example/child2: 500 DEBUG:root:Next fetch from /example/child2: 500 DEBUG:root:Next fetch from /example/child2: 500 ... 

I've tried to enter the await statement in places however I seem to misunderstand how it should as per my understanding the await statement makes the function wait for the result before continuing function execution. How can I make python execute these concurrently?

Update 1

I reformatted parts of the code as per @Michael Butscher suggestion and also added my imports in the previous block

 def fetch_users(self): users_ous = self.service.orgunits().list(customerId="my_customer", orgUnitPath="/example", type="children", fields="organizationUnits/orgUnitPath").execute() ous_list = [ou["orgUnitPath"] for ou in users_ous["organizationUnits"]] logger.debug(f"Fetched and sanitized ous: {pprint.pformat(ous_list)}") asyncio.run(self._crawl_ous(ous_list)) async def _crawl_ous(self, ous: list): tasks = [self._crawler_proxy(ou) for ou in ous] crawling_result = await asyncio.gather(*tasks) for result in crawling_result: logger.info(f"Crawling result: {len(result)}") self.users_list.extend(result) async def _crawler_proxy(self, *args, **kwargs): loop = asyncio.get_event_loop() return await loop.run_in_executor(None, self._fetch_users_from_ou(*args, **kwargs)) 
9
  • It seems you use a library which does the actual API calls. If that library doesn't support asynchronous calls, you can only use run_in_executor (meaning multithreading or maybe multiprocessing) which seems not really better than using an executor directly without using asynchronous code in your case. Commented Jan 12, 2024 at 11:46
  • So if I understand correctly yo usuggest that I should give up asyncio and rather use threading to execute these function calls using multiple threads? Commented Jan 12, 2024 at 13:04
  • If your library doesn't provide async functions and you can't (or don't want to) find another library or do the API calls directly with an async function or library (e.g. requests variant requests-async) then yes. Commented Jan 12, 2024 at 13:08
  • Instead of using threading directly you may use an Executor from concurrent.futures or multiprocessing.pool.ThreadPool. Commented Jan 12, 2024 at 13:13
  • I would like to use this library nonetheless as it is google's supported library google-api-python-client which I forgot to include in the post, for which I'm sorry. However based on your suggestion and as per this link I modified my code yet I see no effect on the execution . Commented Jan 12, 2024 at 14:04

2 Answers 2

1

Your main culprit is coroutine _fetch_users_from_ou. It does not have any await statement inside, so it means it will not allow any other task to run in between until this function executes, thus all of them run sequentially in asyncio.gather call inside your _crawl_ous.

You would want these changes:

users_from_ou = await asyncio.to_thread( self.service.users().list(**call_parameters).execute)

users_from_ou = await asyncio.to_thread( self.service.users().list(**call_parameters, pageToken=next_page_token).execute)

Assuming that you have python 3.9 or above and more about what this does: https://docs.python.org/3/library/asyncio-task.html#running-in-threads

Sign up to request clarification or add additional context in comments.

2 Comments

So I implemented your suggestion and the effect was that all calls to _fetch_users_from_ou start at the same time however they fall into some kind of inifite loop as the first call users_from_ou = await asyncio.to_thread( self.service.users().list(**call_parameters).execute) doesn't move further wit the code
could be that self.service.user().list(**call_paremeters).execute is not thread safe. You can try to force it to only execute one call and see if it goes through. To try to go around the problem of self.service.users() not being thread safe, you can make it so that self.service is not used instead it is built on the fly in the function, something like: def make_call(*different_args): service = build(api, version, credentials=delegated_credentials) ...do the calls you need return result... But I am not sure here either.
0

Once again thank you @Juris and @Michael Butscher for the provided suggestions and help. In the end I used different approach. The simplest solution in this case turned out to be multiprocessing. I used the multiprocessing.Pool().map() function. Below is the modified code.

 def fetch_users(self) -> None: config = ScriptConfig() users_ous = self.service.orgunits().list(customerId="my_customer", orgUnitPath="/example", type="children", fields="organizationUnits/orgUnitPath").execute() ous_list = [ou["orgUnitPath"] for ou in users_ous["organizationUnits"]] logger.debug(f"Fetched and sanitized ous: {pprint.pformat(ous_list)}") self._crawl_ous(ous_list) logger.info(f"Fetched users: {len(self.users_list)}") def _crawl_ous(self, ous: list[str]) -> None: users_list = [] with multiprocessing.Pool(processes=30) as pool: crawling_results = pool.map(self._fetch_users_from_ou, ous) for result in crawling_results: logger.info(f"Crawling result: {len(result)}") self.users_list.extend(result) def _fetch_users_from_ou(self, ou): call_parameters = { "customer": "my_customer", "maxResults": 500, "projection": "basic", "query": f"orgUnitPath='{str(ou)}'", "fields": "users/id,users/name,users/primaryEmail,users/suspended,users/emails,users/organizations/primary,users/organizations/department,users/recoveryEmail,nextPageToken" } logger.info(f"Fetching users from {ou}") users_from_ou: dict = self.service.users().list(**call_parameters).execute() logger.debug(f"Initial fetch from {ou}: {len(users_from_ou["users"])}") user_fetching_result: list = users_from_ou["users"] if "nextPageToken" in users_from_ou: next_page_token: str = users_from_ou["nextPageToken"] else: return user_fetching_result while True: users_from_ou: dict = self.service.users().list(**call_parameters, pageToken=next_page_token).execute() logger.debug(f"Next fetch from {ou}: {len(users_from_ou["users"])}") user_fetching_result.extend(users_from_ou["users"]) if "nextPageToken" in users_from_ou: next_page_token: str = users_from_ou["nextPageToken"] else: return user_fetching_result 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.