0

I am trying to parallelize a function that uses a zarr datacube read from an s3 bucket to perform some calculations. However, when using a LocalCluster from dask.distributed, I lose the .rio accessor of the dataset needed to clip with a shapefile. Below is the function I am trying to parallelize.


def list_outputs(kuj_vels_filt):
    # Iterate over each mid_date in kuj_vels_filt_clip
    saved_means       = []
    saved_dates       = []
    saved_coverage    = []
    filtered_datasets = []
    for mid_date in kuj_vels_filt.mid_date.values:
        mid_date = pd.to_datetime(mid_date)  # Convert to pandas datetime
        
        # Find the nearest date in terminus_dates
        nearest_date = min(terminus_dates, key=lambda x: abs(x - mid_date))
        
        # Formulate the filename based on the nearest date
        filename = nearest_date.strftime('%Y_%m_%d_term_polygon.shp')
        filepath = os.path.join(terminus_poly_dir, filename)
        
        # Import the related .shp file as clip_poly
        clip_poly = gpd.read_file(filepath)
        kuj_poly_3413 = clip_poly.to_crs(3413)
        kuj_vels_filt_clip = kuj_vels_filt.rio.clip(kuj_poly_3413.geometry.values,
                                                    kuj_poly_3413.crs, drop=True, invert=False)
        
        # Compute pixel_mask
        data = kuj_vels_filt_clip.v.sel(mid_date=mid_date)
        pixel_area = data.count(dim=["x", "y"]) * 120 * 120
        area = kuj_poly_3413.area
        pixel_mask = np.divide(pixel_area,area[0])
        # pixel_mask = pixel_area / kuj_poly_3413.area
        
        # Check if all values in pixel_mask meet the condition
        # if (pixel_mask > pixel_mask_threshold).all():
        # mean_value = np.nanmean(kuj_vels_filt_clip.v.sel(mid_date=mid_date).values)
        mean_value = kuj_vels_filt_clip.v.sel(mid_date=mid_date).mean(skipna=True)
        
        filtered_datasets.append(kuj_vels_filt_clip)
        saved_means.append(mean_value)
        saved_dates.append(mid_date)
        saved_coverage.append(pixel_mask)
    
    # Combine the filtered variables into a single DataArray
    # kuj_vels_final = xr.concat(filtered_datasets, dim='mid_date')

    return saved_means, saved_dates, saved_coverage

Below here are the lines where I am creating the client for the LocalCluster

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers = 12)
c = Client(cluster)

vel_ts_list = []
for i in range(cpus):
    chunked_vel = kuj_vels.isel(mid_date=slice(i*manual_chunk,manual_chunk*(i+1))) # select the range of data from beginning to end of chunk size
    vel_ts_list.append(chunked_vel)

future = c.map(list_outputs, vel_ts_list)
results = c.gather(future)

I end up getting this error

Exception: 'AttributeError("\'Dataset\' object has no attribute \'rio\'")'

Here is a link to the jupyter notebook in question.

1 Answer 1

0

I encountered a similar issue when using multiple threads. For me, it seems to be the problem that in the environment of the thread the rio accessor of rioxarray does not get activated. I only found a workaround: I import the rioxarray library again in the function, which should be executed by the thread.

Not the answer you're looking for? Browse other questions tagged or ask your own question.