As an alternative to rolling your own parallel processing implementation, here's an simple example of using dask and dask-geopandas to parallelize some polygon on raster statistics using the rasterstats package.
To handle errors, you might want to look at iterating over successful ops as described in the debug section of the dask documetation.
from dask.distributed import Client import dask_geopandas as dgpd import geopandas as gpd import pandas as pd import scipy.stats from rasterstats import zonal_stats def kurtosis(arr): try: k = scipy.stats.kurtosis(arr.compressed()) except AttributeError: # Not a masked array - 'numpy.ndarray' object has no attribute 'compressed' k = scipy.stats.kurtosis(arr) return k def zonal_statistics(row, *args, **kwargs): return row.join(pd.DataFrame(zonal_stats(gdf, *args, **kwargs), index=gdf.index)) if __name__ == "__main__": client = Client() zones = "zones.shp" values = "values.tif" # pre-defined stats, see the full list at # https://pythonhosted.org/rasterstats/manual.html#zonal-statistics stats = ["mean", "std"] # user-defined stats # https://pythonhosted.org/rasterstats/manual.html#user-defined-statistics add_stats = {"kurtosis": kurtosis} gdf = gpd.read_file(zones) ddf = dgpd.from_geopandas(gdf, npartitions=4) # we need to tell dask_geopandas what the output should look like meta = ddf._meta.join(pd.DataFrame(columns=stats+list(add_stats.keys()))) res = ddf.map_partitions(zonal_statistics, meta=meta, raster=values, stats=stats, add_stats=add_stats, all_touched=True) results = res.compute() print(gdf.head()) print(results.head()) Output:
id geometry 0 1 POLYGON ((440819.096 3751134.570, 440983.634 3... 1 2 POLYGON ((441147.373 3751255.178, 441265.585 3... 2 3 POLYGON ((441438.110 3751077.860, 441789.549 3... 3 4 POLYGON ((440808.713 3750651.340, 440925.327 3... 4 5 POLYGON ((441174.530 3750450.061, 441371.017 3... id geometry ... std kurtosis 0 1 POLYGON ((440819.096 3751134.570, 440983.634 3... ... 16.546371 -0.129432 1 2 POLYGON ((441147.373 3751255.178, 441265.585 3... ... 9.588606 -0.184888 2 3 POLYGON ((441438.110 3751077.860, 441789.549 3... ... 18.036208 1.952527 3 4 POLYGON ((440808.713 3750651.340, 440925.327 3... ... 12.462851 -0.581433 4 5 POLYGON ((441174.530 3750450.061, 441371.017 3... ... 27.835329 0.251267 And here's an example of handling errors in the calculations:
from dask.dataframe import from_delayed from dask.distributed import Client, LocalCluster, as_completed import dask_geopandas as dgpd import geopandas as gpd import pandas as pd import scipy.stats from rasterstats import zonal_stats def kurtosis(arr): try: k = scipy.stats.kurtosis(arr.compressed()) except AttributeError: # Not a masked array - 'numpy.ndarray' object has no attribute 'compressed' k = scipy.stats.kurtosis(arr) return k def zonal_statistics(row, *args, **kwargs): from random import randint if randint(0, 3) == 1: raise RuntimeError('Fake random error to test') gdf = row.compute() return gdf.join(pd.DataFrame(zonal_stats(gdf, *args, **kwargs), index=gdf.index)) if __name__ == "__main__": zones = "zones.shp" values = "values.tif" stats = ["mean", "std"] add_stats = {"kurtosis": kurtosis} cluster = LocalCluster() client = Client(cluster) gdf = gpd.read_file(zones) ddf = dgpd.from_geopandas(gdf, npartitions=4) # we need to tell dask_geopandas what the output should look like meta = ddf._meta.join(pd.DataFrame(columns=stats+list(add_stats.keys()))) futures = [client.submit(zonal_statistics, p, raster=values, stats=stats, add_stats=add_stats, all_touched=True) for p in ddf.partitions] completed = [] for future in as_completed(futures): try: data = future.result() completed.append(future) except Exception as exc: pass results = from_delayed(completed, meta=meta, verify_meta=False).compute() print(gdf.head()) print(f"{len(gdf)} rows") print(results.head()) print(f"{len(results)} rows") Output:
2022-06-11 19:14:51,192 - distributed.worker - WARNING - Compute Failed Key: zonal_statistics-bddca01b8c33227011c65ea05a10f813 Function: zonal_statistics args: (Dask GeoDataFrame Structure: id geometry npartitions=1 6 int64 geometry 7 ... ... Dask Name: blocks, 5 tasks) kwargs: {'raster': 'values.tif', 'stats': ['mean', 'std'], 'add_stats': {'kurtosis': <function kurtosis at 0x7f5e4a7dbe50>}, 'all_touched': True} Exception: "RuntimeError('Fake random error to test')" id geometry 0 1 POLYGON ((440819.096 3751134.570, 440983.634 3... 1 2 POLYGON ((441147.373 3751255.178, 441265.585 3... 2 3 POLYGON ((441438.110 3751077.860, 441789.549 3... 3 4 POLYGON ((440808.713 3750651.340, 440925.327 3... 4 5 POLYGON ((441174.530 3750450.061, 441371.017 3... 8 rows id geometry ... std kurtosis 0 1 POLYGON ((440819.096 3751134.570, 440983.634 3... ... 16.546371 -0.129432 1 2 POLYGON ((441147.373 3751255.178, 441265.585 3... ... 9.588606 -0.184888 2 3 POLYGON ((441438.110 3751077.860, 441789.549 3... ... 18.036208 1.952527 3 4 POLYGON ((440808.713 3750651.340, 440925.327 3... ... 12.462851 -0.581433 4 5 POLYGON ((441174.530 3750450.061, 441371.017 3... ... 27.835329 0.251267 6 rows