12

I have a dask dataframe (df) with around 250 million rows (from a 10Gb CSV file). I have another pandas dataframe (ndf) of 25,000 rows. I would like to add the first column of pandas dataframe to the dask dataframe by repeating every item 10,000 times each.

Here's the code that I tried. I have reduced the problem to a smaller size.

import dask.dataframe as dd import pandas as pd import numpy as np pd.DataFrame(np.random.rand(25000, 2)).to_csv("tempfile.csv") df = dd.read_csv("tempfile.csv") ndf = pd.DataFrame(np.random.randint(1000, 3500, size=2500)) df['Node'] = np.repeat(ndf[0], 10) 

With this code, I end up with an error.

ValueError: Not all divisions are known, can't align partitions. Please use set_index to set the index.

I can perform a reset_index() followed by a set_index() to make df.known_divisions True for the dask dataframe. But it is a time consuming operation. Is there a better faster way to do what I am trying to do? Can I do this using pandas itself?

The end goal is to find rows from ndf where any of the corresponding rows from df matches some criteria.

1
  • Hi Najeem, were you able to check my proposed solution? Commented Feb 22, 2019 at 12:37

2 Answers 2

1

Your basic algorithm is "I'd like the first 10 values of df['Node'] to be set to the first value of ndf, the next 10 values to the next value of ndf, and so on". The reason this is hard in Dask, is because it doesn't know how many rows are in each partition: you are reading from CSVs, and the number of rows you get in X bytes depends on exactly what the data are like in each part. Other formats give you more information...

You will, therefore, certainly need two passes through the data. You could work with the index, to figure out divisions and potentially do some sorting. To my mind, the easiest thing you can do is simply to measure the division lengths, and so get the offset of the start of each:

lengths = df.map_partitions(len).compute() offsets = np.cumsum(lengths.values) offsets -= offsets[0] 

and now use custom delayed function to work on the parts

@dask.delayed def add_node(part, offset, ndf): index = pd.Series(range(offset, offset + len(part)) // 10, index=part.index) # 10 is the repeat factor part['Node'] = index.map(ndf) return part df2 = dd.from_delayed([add_node(d, off, ndf) for d, off in zip(df.to_delayed(), offsets)]) 
Sign up to request clarification or add additional context in comments.

Comments

0

Using you same workflow you can manually set divisions as suggested here

import dask.dataframe as dd import pandas as pd import numpy as np pd.DataFrame(np.random.rand(25000, 2)).to_csv("tempfile.csv", index=False) df = dd.read_csv("tempfile.csv") ndf = pd.DataFrame(np.random.randint(1000, 3500, size=2500)) df.divisions = (0, len(df)-1) df["Note"] = dd.from_array(np.repeat(ndf.values, 10)) 

I don't think using np.repeat is very efficient in particular for big df.

4 Comments

The example ran fine, however, when I tried it on the original dataframe, the line df.divisions = (0, len(df)-1) took half n hour to run. I am assuming my filtering operations would take similar time. I think the reset_index() and set_index() also would take similar time.
I guess the problem here is len(df). Do you mind to run just that to see the timing? Working with so big csv file could be a problem too. Have you tried to split into several smaller files? See this
Yea, that's how I solved the problem for the timebeing. But it's not a scalable method. I could as well use chunks in pandas. But I wanted to use dask so that I could scale the problem really quickly (using other machines on my network).
I did the same mistake when starting to use dask, and I saw that colleagues that are using spark are facing the same problems. It is actually worth to spend time nicely partitioning your data and save it on a columnar format as parquet. Then using the very same dask command you could see a performance boost.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.