1

I have a very large raster data and a shape file with 79,867 polygons. I want to efficiently extract statistics from the raster within each of the polygons. To do so, I have combined rasterio and geopandas. I first convert the shape into a geopandas dataframe. Then I iterate over each row of the dataframe and cookie cut the raster with each of the rows (as one-row geopandas dataframe), generate a numpy array and then I can get any stats I want with it (see code below). I save the stats into a dictionary and convert the final dictionary into a pandas DF. This is a slow process, of course, because iterating over dataframe rows is naturally slow. One feature of my code below is that I have a provision to save partial results into a csv file and give me a progress report. My goal now is to parallelize this algorithm. I have been searching alternatives for that, and most of the suggestions include dividing the dataframe into chunks and process each chunk in a vectorized fashion in a different core. This can be done with multiprocessing pool and there is plenty of examples available. However, if I do that, I will lose (at least as far as my coding skills go) the ability to create progress reports and save partial results. In addition, because the cookie cut process is rather slow, I am not convinced that vectorizing it will show gains in speed, and I am not sure the geopandas dataframe will handle such a complex process well. So, my inclination is to retain the row by row processing and distribute the individual rows into different cores instead of partitioning the entire dataframe as suggested. I would like some advice on what is the best alternative to parallelize this process. Unfortunately, most examples I came across to parallelize pandas dataframe processes apply very simple functions, like x*x, which is far from being this case.

import pandas as pd import geopandas as gpd import rasterio as rio from rasterio.mask import mask from time import time StartTime = time() geodf = gpd.read_file("data/polygon.shp") statsDict = {} with rio.open("data/raster.tif") as src: profile = src.profile n=0 for gpd in geodf.itertuples(): if n > 0 and n%5000 == 0: perc = 100*n/geodf.shape[0] print (int(perc), ' %', (time() - StartTime)/ 60, ' minutes') forecastFinish = round(((100 * (time() - StartTime)/perc)/3600), 1) print(' estimated time to finish: %s hrs' %forecastFinish) df = pd.DataFrame(statsDict.items(), columns=['feature_id', 'meanScore']) fileName = 'results/partial_' + str(n) + '.csv' df.to_csv(fileName, index=False) statsDict = {} feature_id = (gpd.__getattribute__('feature_id')) polygon = gpd.__getattribute__('geometry') IterPolygon = [gpd.__getattribute__('geometry')] mask_feature_id, mask_feature_id_Transform = mask(xscores, \ IterPolygon, invert=False) flat_mask_feature_id = np.ravel(mask_feature_id) meanScore = np.mean(convertNpArrayIntoPreppedList(flat_mask_feature_id)) statsDict[feature_id] = meanScore n+=1 df = pd.DataFrame(statsDict.items(), columns=['feature_id', 'meanScore']) fileName = 'results/final' + '.csv' df.to_csv(fileName, index=False) 
5
  • Doing it in chunks is a solid approach. You can send the results from processing each of the chunks back to the main process where it can report progress (based on the number of chunks completed) and write to the CSV Commented Jun 10, 2022 at 22:39
  • Yes, I thought about that. However, If I do that, one caveat is that I would like to split the dataframe into a number of chunks much greater than the number of cores, which implies some chunks will remain in queue. I couldn' t find a good alternative for that yet. Commented Jun 10, 2022 at 22:45
  • That's perfectly fine, you can use something like concurrent.futures to manage a large number of tasks running in a smaller number of threads/processes Commented Jun 10, 2022 at 22:49
  • I think this is the proper way to parallelize. Thanks. However, vectorizing the mask to use each row as a different polygon is not trivial. There is not a lot of documentation out there. I guess this has lead my early decision to iterate over rows instead of applying the function to the dataframe. Commented Jun 11, 2022 at 0:44
  • I successfuly make this process work. A few comments. I first split the main dataframe using df_split = np.array_split(geodf, num_partitions) , where num_partitions = number of cores I want to use. This is a process that takes up a lot of time and one needs to balance how many partitions to use wrt te time to process each partition. Other than that, the process became almost 1000x faster than iterating over rows. Commented Jun 15, 2022 at 20:22

1 Answer 1

3

As an alternative to rolling your own parallel processing implementation, here's an simple example of using dask and dask-geopandas to parallelize some polygon on raster statistics using the rasterstats package.

To handle errors, you might want to look at iterating over successful ops as described in the debug section of the dask documetation.

from dask.distributed import Client import dask_geopandas as dgpd import geopandas as gpd import pandas as pd import scipy.stats from rasterstats import zonal_stats def kurtosis(arr): try: k = scipy.stats.kurtosis(arr.compressed()) except AttributeError: # Not a masked array - 'numpy.ndarray' object has no attribute 'compressed' k = scipy.stats.kurtosis(arr) return k def zonal_statistics(row, *args, **kwargs): return row.join(pd.DataFrame(zonal_stats(gdf, *args, **kwargs), index=gdf.index)) if __name__ == "__main__": client = Client() zones = "zones.shp" values = "values.tif" # pre-defined stats, see the full list at # https://pythonhosted.org/rasterstats/manual.html#zonal-statistics stats = ["mean", "std"] # user-defined stats # https://pythonhosted.org/rasterstats/manual.html#user-defined-statistics add_stats = {"kurtosis": kurtosis} gdf = gpd.read_file(zones) ddf = dgpd.from_geopandas(gdf, npartitions=4) # we need to tell dask_geopandas what the output should look like meta = ddf._meta.join(pd.DataFrame(columns=stats+list(add_stats.keys()))) res = ddf.map_partitions(zonal_statistics, meta=meta, raster=values, stats=stats, add_stats=add_stats, all_touched=True) results = res.compute() print(gdf.head()) print(results.head()) 

Output:

 id geometry 0 1 POLYGON ((440819.096 3751134.570, 440983.634 3... 1 2 POLYGON ((441147.373 3751255.178, 441265.585 3... 2 3 POLYGON ((441438.110 3751077.860, 441789.549 3... 3 4 POLYGON ((440808.713 3750651.340, 440925.327 3... 4 5 POLYGON ((441174.530 3750450.061, 441371.017 3... id geometry ... std kurtosis 0 1 POLYGON ((440819.096 3751134.570, 440983.634 3... ... 16.546371 -0.129432 1 2 POLYGON ((441147.373 3751255.178, 441265.585 3... ... 9.588606 -0.184888 2 3 POLYGON ((441438.110 3751077.860, 441789.549 3... ... 18.036208 1.952527 3 4 POLYGON ((440808.713 3750651.340, 440925.327 3... ... 12.462851 -0.581433 4 5 POLYGON ((441174.530 3750450.061, 441371.017 3... ... 27.835329 0.251267 

And here's an example of handling errors in the calculations:

from dask.dataframe import from_delayed from dask.distributed import Client, LocalCluster, as_completed import dask_geopandas as dgpd import geopandas as gpd import pandas as pd import scipy.stats from rasterstats import zonal_stats def kurtosis(arr): try: k = scipy.stats.kurtosis(arr.compressed()) except AttributeError: # Not a masked array - 'numpy.ndarray' object has no attribute 'compressed' k = scipy.stats.kurtosis(arr) return k def zonal_statistics(row, *args, **kwargs): from random import randint if randint(0, 3) == 1: raise RuntimeError('Fake random error to test') gdf = row.compute() return gdf.join(pd.DataFrame(zonal_stats(gdf, *args, **kwargs), index=gdf.index)) if __name__ == "__main__": zones = "zones.shp" values = "values.tif" stats = ["mean", "std"] add_stats = {"kurtosis": kurtosis} cluster = LocalCluster() client = Client(cluster) gdf = gpd.read_file(zones) ddf = dgpd.from_geopandas(gdf, npartitions=4) # we need to tell dask_geopandas what the output should look like meta = ddf._meta.join(pd.DataFrame(columns=stats+list(add_stats.keys()))) futures = [client.submit(zonal_statistics, p, raster=values, stats=stats, add_stats=add_stats, all_touched=True) for p in ddf.partitions] completed = [] for future in as_completed(futures): try: data = future.result() completed.append(future) except Exception as exc: pass results = from_delayed(completed, meta=meta, verify_meta=False).compute() print(gdf.head()) print(f"{len(gdf)} rows") print(results.head()) print(f"{len(results)} rows") 

Output:

2022-06-11 19:14:51,192 - distributed.worker - WARNING - Compute Failed Key: zonal_statistics-bddca01b8c33227011c65ea05a10f813 Function: zonal_statistics args: (Dask GeoDataFrame Structure: id geometry npartitions=1 6 int64 geometry 7 ... ... Dask Name: blocks, 5 tasks) kwargs: {'raster': 'values.tif', 'stats': ['mean', 'std'], 'add_stats': {'kurtosis': <function kurtosis at 0x7f5e4a7dbe50>}, 'all_touched': True} Exception: "RuntimeError('Fake random error to test')" id geometry 0 1 POLYGON ((440819.096 3751134.570, 440983.634 3... 1 2 POLYGON ((441147.373 3751255.178, 441265.585 3... 2 3 POLYGON ((441438.110 3751077.860, 441789.549 3... 3 4 POLYGON ((440808.713 3750651.340, 440925.327 3... 4 5 POLYGON ((441174.530 3750450.061, 441371.017 3... 8 rows id geometry ... std kurtosis 0 1 POLYGON ((440819.096 3751134.570, 440983.634 3... ... 16.546371 -0.129432 1 2 POLYGON ((441147.373 3751255.178, 441265.585 3... ... 9.588606 -0.184888 2 3 POLYGON ((441438.110 3751077.860, 441789.549 3... ... 18.036208 1.952527 3 4 POLYGON ((440808.713 3750651.340, 440925.327 3... ... 12.462851 -0.581433 4 5 POLYGON ((441174.530 3750450.061, 441371.017 3... ... 27.835329 0.251267 6 rows 

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.