2

Is it possible to use OWSLib with aiohttp?

Is there a Python library for formatting/parsing WPS (and other OGC web services, such as WMS - web map service) requests, that also supports IO coroutines (using async/await syntax)? Note that asynchronous coroutines were specifically added to the python language to facilitate improved performance of concurrent IO, particularly for performing many web requests in parallel.

For example, is there a way to still utilise OWSLib (which already understands the different OWS specs) to format requests and to parse responses, but without letting OWSLib actually dispatch and wait for those requests itself (because OWSLib ordinarily uses requests to perform blocking IO)?

3
  • Have you ever found an answer to your question? Commented Mar 29, 2023 at 18:10
  • 1
    @MorningGlory no; I think OWSLib lacked official support for async (and this seems not to have changed since), so I considered resorting to monkey-patching (intercepting) its calls to the underlying HTTP request libraries, or else just using aiohttp directly with no OWS-specific higher-level wrapper. (FWIW, I think WPS is inherently prone to poor scaling/concurrency anyway.) Commented Mar 29, 2023 at 23:06
  • Good to know. Thanks! Commented Mar 31, 2023 at 13:19

1 Answer 1

1

Here's my approach to the problem, although my need for owslib is very specific, so it may not be applicable to your situation. My objective was to extract weather forecast data at a specific point using the WebMapService of owslib. As far as I know, this is the only way to achieve this with Environment Canada forecasts. To get the data, I made a request through wms.getfeatureinfo(). This returns a wrapper function that can be read and decoded without asyncio. However, to make it work with asyncio, I needed to use the method .geturl() from the wrapper, which gets the query URL. Then, I used the function async with session.get(url) as resp to obtain the necessary data.

Here is basically the necessary function. A lot of it has blabla stuff to handle whatif cases.

import re import numpy as np import pandas as pd from datetime import datetime import warnings from owslib.util import ServiceException from owslib.wms import WebMapService warnings.filterwarnings("ignore") import aiohttp import asyncio async def request(layer: str, time: datetime.date, coor: list) -> list: info = [] pixel_value = [] # Create an aiohttp client session for making HTTP requests async with aiohttp.ClientSession() as session: # Iterate through each timestep for timestep in time: # Get the request URL using the OWSLib library url = wms.getfeatureinfo(layers=[layer], srs='EPSG:4326', bbox=tuple(coor), size=(100, 100), format='image/jpeg', query_layers=[layer], info_format='text/plain', xy=(1, 1), feature_count=1, time=str(timestep.isoformat()) + 'Z' ).geturl() try: # Make an asynchronous GET request to the URL async with session.get(url) as resp: # Check if the response status is OK (200) if resp.status == 200: # Read the text content of the response text = await resp.text() # Extract the value using a regex pattern pixel_value.append(str(re.findall(r'value_0\s+\d*.*\d+', text))) try: # Convert the extracted value to a float pixel_value[-1] = float( re.sub('value_0 = \'', '', pixel_value[-1]) .strip('[""]') ) except ValueError: # Handle any issues with data extraction and print a message print( f'Problem with the extract data (most likely empty output) at time = {timestep} and layer = {layer}') print('Returning empty float instead') pixel_value[-1] = [np.nan] else: # If the response status is not OK, print an error message print(f'Request could not be made for some reason at time = {timestep} and layer = {layer}') pixel_value.append(np.nan) except ServiceException: # Handle any ServiceException errors print(f'Request could not be made for some reason at time = {timestep} and layer = {layer}') pixel_value.append(np.nan) return pixel_value 

The following is a reproducible code, just in case.

Stations_info = pd.DataFrame({'ID': {0: 'COMPTN', 1: 'DUNHM'}, 'Lat': {0: 45.2608, 1: 45.0959}, 'Lon': {0: -71.8337, 1: -72.8126}, 'Alt': {0: 245, 1: 225}, 'Name': {0: 'Compton', 1: 'Dunham'}, 'Lon2': {0: -71.7337, 1: -72.71260000000001}, 'Lat2': {0: 45.1608, 1: 44.9959}}) address = "geo.weather.gc.ca/geomet?service=WMS" # for operational use # "http://collaboration.cmc.ec.gc.ca/rpn-wms" # for experimental use # Version = "1.3.0" HRDPS_varlist = ['HRDPS.CONTINENTAL_TT', 'HRDPS.CONTINENTAL_HR', "HRDPS.CONTINENTAL_PR"] wms = WebMapService('https://geo.weather.gc.ca/geomet?SERVICE=WMS' + '&REQUEST=GetCapabilities', version=Version, timeout=300) async def process_request(arg): info = pd.DataFrame(arg).T print(f'Acquiring weather forecast for {arg.iloc[0]}') coor = (info[['Lon', 'Lat2', 'Lon2', 'Lat']].iloc[0].tolist()) # Can add nb_timesteps to define how far we want to go. Must add as function argument nb_timestep = 2 # HRDPS_df = await run_HRDPS(coor,nb_timestep) datetime_str = '23-03-31 13:00:00' # Hardcoded for the sake of the example This should change to always be tomorrow. time_utc = [datetime.strptime(datetime_str, '%y-%m-%d %H:%M:%S')] time_local = time_utc.copy() # Make async requests for each layer in GDPS_varlist and create a dictionary pixel_value_dict_HRDPS = { layer: await request(layer, time_utc[:nb_timestep], coor) for layer in HRDPS_varlist} # Convert the dictionary to a pandas DataFrame HRDPS_df = pd.DataFrame.from_dict(pixel_value_dict_HRDPS, orient='index').transpose() HRDPS_df['Date'] = time_local[:nb_timestep] HRDPS_df['HRDPS.ECONTINENTAL_PR'] = HRDPS_df['HRDPS.CONTINENTAL_PR'].diff() print(HRDPS_df) async def main(): tasks = [process_request(row) for _, row in Stations_info.iterrows()] await asyncio.gather(*tasks) if __name__ == "__main__": asyncio.run(main()) 

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.