0

I am trying to perform a somewhat involved operation on a dataframe involving grouping and operations on numpy arrays. I can get the operation to work in Pandas, but the equivalent code in Dask gives me an error that I haven't figured out how to work around.

I have two kinds of items in my dataframe: shapes and colors. Each item is associated with a vector represented as a numpy array.

from pandas import DataFrame from numpy import array from scipy.spatial.distance import cosine from numpy import mean as array_mean p = DataFrame({ "type": ["shape", "shape", "color", "color", "color"], "vector": [array([1.0, 1.1]), array([0.8, 0.9]), array([0.6, 0.8]), array([1.1, 1.2]), array([0.7, 0.9]) ] }) type vector 0 shape [1.0, 1.1] 1 shape [0.8, 0.9] 2 color [0.6, 0.8] 3 color [1.1, 1.2] 4 color [0.7, 0.9] 

I want to do the following:

  1. Group the items by type.
  2. Take the mean vector for each group.
  3. For each item, calculate the cosine distance of its vector to its group's mean and sort within the group by this distance.

The following function accomplishes this with Pandas.

def pandas_ordering(f): means = f.groupby("type")["vector"].apply(array_mean).to_frame().rename(columns={"vector":"mean"}) f = f.merge(means, left_on="type", right_index=True) f["cosine distance"] = f.apply(lambda row:cosine(row["vector"], row["mean"]), axis=1) return f.groupby("type", group_keys=False).apply(lambda x:x.sort_values("cosine distance")) pandas_ordering(p) type vector mean cosine distance 4 color [0.7, 0.9] [0.8000000000000002, 0.9666666666666667] 0.000459 2 color [0.6, 0.8] [0.8000000000000002, 0.9666666666666667] 0.001144 3 color [1.1, 1.2] [0.8000000000000002, 0.9666666666666667] 0.001280 0 shape [1.0, 1.1] [0.9, 1.0] 0.000012 1 shape [0.8, 0.9] [0.9, 1.0] 0.000019 

I rewrote the function in Dask. The logic is identical, and the code is almost identical except for a couple of meta decorations.

import dask.dataframe as dd f = dd.from_pandas(p, npartitions=1) def dask_ordering(f): means = f.groupby("type")["vector"].apply(array_mean, meta="object").to_frame().rename(columns={0:"mean"}) f = f.merge(means, left_on="type", right_index=True) f["cosine distance"] = f.apply(lambda row:cosine(row["vector"], row["mean"]), axis=1, meta="object") f.groupby("type", group_keys=False).apply(lambda x:x.sort_values("cosine distance"), meta="object") return f 

However, the Dask version gives an error when it tries to merge the frame of means with the original frame of vectors.

dask_ordering(f).compute() --------------------------------------------------------------------------- ValueError Traceback (most recent call last) <ipython-input-120-46dd96a5db68> in <module> ----> 1 dask_ordering(f).compute() <ipython-input-119-49ddda5479b1> in dask_ordering(f) 5 def dask_ordering(f): 6 means = f.groupby("type")["vector"].apply(array_mean, meta="object").to_frame().rename(columns={0:"mean"}) ----> 7 f = f.merge(means, left_on="type", right_index=True) 8 f["cosine distance"] = f_1.apply(lambda row:cosine(row["vector"], row["mean"]), axis=1, meta="object") 9 f.groupby("type", group_keys=False).apply(lambda x:x.sort_values("cosine distance"), meta="object") ~/Documents/notebooks/env/lib/python3.6/site-packages/dask/dataframe/core.py in merge(self, right, how, on, left_on, right_on, left_index, right_index, suffixes, indicator, npartitions, shuffle) 3768 npartitions=npartitions, 3769 indicator=indicator, -> 3770 shuffle=shuffle, 3771 ) 3772 ~/Documents/notebooks/env/lib/python3.6/site-packages/dask/dataframe/multi.py in merge(left, right, how, on, left_on, right_on, left_index, right_index, suffixes, indicator, npartitions, shuffle, max_branch) 490 right_index=right_index, 491 suffixes=suffixes, --> 492 indicator=indicator, 493 ) 494 ~/Documents/notebooks/env/lib/python3.6/site-packages/dask/dataframe/multi.py in single_partition_join(left, right, **kwargs) 321 # new index will not necessarily correspond the current divisions 322 --> 323 meta = left._meta_nonempty.merge(right._meta_nonempty, **kwargs) 324 kwargs["empty_index_dtype"] = meta.index.dtype 325 name = "merge-" + tokenize(left, right, **kwargs) ~/Documents/notebooks/env/lib/python3.6/site-packages/pandas/core/frame.py in merge(self, right, how, on, left_on, right_on, left_index, right_index, sort, suffixes, copy, indicator, validate) 7347 copy=copy, 7348 indicator=indicator, -> 7349 validate=validate, 7350 ) 7351 ~/Documents/notebooks/env/lib/python3.6/site-packages/pandas/core/reshape/merge.py in merge(left, right, how, on, left_on, right_on, left_index, right_index, sort, suffixes, copy, indicator, validate) 79 copy=copy, 80 indicator=indicator, ---> 81 validate=validate, 82 ) 83 return op.get_result() ~/Documents/notebooks/env/lib/python3.6/site-packages/pandas/core/reshape/merge.py in __init__(self, left, right, how, on, left_on, right_on, axis, left_index, right_index, sort, suffixes, copy, indicator, validate) 628 # validate the merge keys dtypes. We may need to coerce 629 # to avoid incompat dtypes --> 630 self._maybe_coerce_merge_keys() 631 632 # If argument passed to validate, ~/Documents/notebooks/env/lib/python3.6/site-packages/pandas/core/reshape/merge.py in _maybe_coerce_merge_keys(self) 1136 inferred_right in string_types and inferred_left not in string_types 1137 ): -> 1138 raise ValueError(msg) 1139 1140 # datetimelikes must match exactly ValueError: You are trying to merge on object and int64 columns. If you wish to proceed you should use pd.concat 

Presumably Dask is confused about the type of one of the columns, but I'm not sure which one. In particular I am not sure what the mention of a "int64" column in the error message is referring to.

The intermediary frame of mean vector values looks the same for both Pandas and Dask.

p.groupby("type")["vector"].apply(array_mean).to_frame().rename(columns={"vector":"mean"}) 

and

f.groupby("type")["vector"].apply(array_mean, meta="object").to_frame().rename(columns={0:"mean"}).compute() 

both give

 mean type color [0.8000000000000002, 0.9666666666666667] shape [0.9, 1.0] 

Does anyone know how to make this work in Dask?

1 Answer 1

1

The problem is when you create the means dataframe - the index is an int and not an str (probably is a bug that is worth while to raise in github) (for some sort of reason - it may automatically be converted into a categorical type or something similar.)

In the mean time - below function is a work around.

def dask_ordering(f): means = f.groupby("type")["vector"].apply(array_mean, meta="object").to_frame(name="mean") means['idx'] = means.index means['idx'] = means['idx'].astype(str) f = f.merge(means, left_on="type", right_on="idx") f["cosine distance"] = f.apply(lambda row:cosine(row["vector"], row["mean"]), axis=1, meta="object") f.groupby("type", group_keys=False).apply(lambda x:x.sort_values("cosine distance"), meta="object") return f 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.