5

I have a big dataset (300.000 examples x 33.000 features), which of course does not fit the memory. The data are saved in HDF5 format. The values are mostly zeros (sparse data). They look like this:

 Attr1 52 52 52 52 52 52 52 52 ... Attr2 umb umb umb umb umb umb umb umb ... CellID TGC-1 TGG-1 CAG-1 TTC-1 GTG-1 GTA-1 CAA-1 CAC-1 ... Acc Gene ... 243485 RP11-.3 0 0 0 0 0 0 0 0 ... 237613 FAM138A 0 0 0 0 0 0 0 0 ... 186092 OR4F5 0 0 0 0 0 0 0 0 ... 238009 RP11-.7 0 0 0 0 0 0 0 0 ... 239945 RP11-.8 0 0 0 0 0 0 0 0 ... 279457 FO538.2 0 0 0 0 0 0 0 0 ... 228463 AP006.2 0 0 0 0 0 0 0 0 ... ... ... ... ... ... ... ... ... ... ... 

I have done the following that works, to load the whole dataset in TensorFlow (loompy is just a package using hdf5 on the background):

import tensorflow as tf import numpy as np import loompy as lp batch_size = 1000 with loompy.connect(filename, 'r') as ds: ds_shape = (batch_size, ds.shape[0]) ds_dtype = ds[0:1, 0:1].dtype labels = np.asarray([ds.ca.CellID, ds.ca.Attr1]).T labels_shape = (batch_size, 1) data_placeholder = tf.placeholder(ds_dtype, ds_shape) labels_placeholder = tf.placeholder(labels[:,1].dtype, labels_shape) dataset = tf.data.Dataset.from_tensor_slices((data_placeholder, labels_placeholder)) dataset = dataset.prefetch(batch_size) iterator = dataset.make_initializable_iterator() next_element = iterator.get_next() with tf.Session() as sess: with loompy.connect(filename, 'r') as ds: for i in range(0, ds.shape[1], batch_size): batch = ds[0 : ds_shape[1], i : i + batch_size].T batch_labels = np.asarray([ds.ca.CellID[i : i + batch_size], ds.ca.Attr1[i : i + batch_size]]).T[:,1] sess.run(iterator.initializer, feed_dict = {data_placeholder: batch, labels_placeholder: batch_labels.reshape(batch_size, 1)}) for _ in range(batch_size): print(sess.run(next_element)) 

Output:

(array([0, 0, 0, ..., 0, 0, 0], dtype=int32), array([b'52'], dtype=object))

(array([0, 0, 0, ..., 0, 0, 0], dtype=int32), array([b'52'], dtype=object))

...

This way however, I am not able to split my data in train, test and evaluation sets. Also, I can only shuffle them inside each batch, which is not effective since most times the data on a batch belong to the same class.

How do I manipulate this kind of data to be able to load them as train, test, evaluation sets, and perform shuffling etc. (preferably by utilizing my TitanX GPU as much as possible)?

5
  • You may want to use tfrecords and store them in sparse features: tensorflow.org/versions/r1.2/api_docs/python/tf/SparseFeature Commented Jun 12, 2018 at 11:20
  • @vijaym The problem with TFRecords is that most examples are image related, so I haven't figured out how to do that with a dataset like this. Can you point me on the right sources? Commented Jun 12, 2018 at 12:12
  • You can check my answer for Numpy arrays, you may need to make some changes for sparse matrices: stackoverflow.com/questions/45427637/… Commented Jun 12, 2018 at 12:24
  • Are you open to making a second (shuffled and split) copy of the data on the disk? There are a number of large scale data processing tools that will do that for you. Commented Jun 14, 2018 at 2:24
  • @Omegastick I am open to everything that could work well. Can you name the tools you are talking about? Commented Jun 14, 2018 at 9:23

3 Answers 3

2

You should definitely try Dask, it allows you to work with data not fitting in memory and it paralyzes computation so that you can use all cores of your cpu. Also I recommend moving your data from hdf to parquet, it allows concurrent reads and writes which speeds things up. Please see the link where Wes McKinney (pandas creator) goes in depth and compares it with other formats.

You could prepare snippets in Dask that prepare train, test and validation sets and read them without exceeding available memory.

Sign up to request clarification or add additional context in comments.

4 Comments

How well are they blend with TensorFlow though? I couldn't find much info online, except some people experimenting with it. There's no much time for experimentation, I just want something that nicely works so I can focus on building the model.
As far as I know it does not blend whatsoever it library to process big amounts of data in parallel you would have to write a function that fits your needs there won't be anything pre-built to feed tensorflow models. Thankfully api is almost exactly the same as in pandas, so it is really straightforward.
Well the loom format I use works pretty much as a Pandas dataframe, but that doesn't help me import and do the processing I want to the data. I need an answer on how to treat such big data for doing different tasks (e.g. shuffling) in TensorFlow.
This is exactly why I recommend parquet + dask, it allows you to work with files much bigger than your RAM as if you would work with pandas.
2

In case there is someone still interested on this topic, here is my solution to this problem I had. In the end I stuck with Loompy file format, as it is really convenient with what I am doing (take a look on Loompy here). To import such a big volume of information in my model, I used the from_generator() function of the tf.data.Dataset TensorFlow API. Also, I created a generator to yield the data as needed.

Below is how my input function looks:

import loompy as lp import tensorflow as tf from sklearn.model_selection import train_test_split model_input_name = "" input_size = 10000 batch_size = 32 epochs = 10 # Input functions for train, test and eval sets. def train_input_fn(): return _input_fn('TRAIN') def test_input_fn(): return _input_fn('TEST') def eval_input_fn(): return _input_fn('EVAL') # General purpose input function def _input_fn(mode = 'TRAIN'): """ Arguments mode : 'TRAIN', 'TEST', 'EVAL' """ # A generator to yield data and labels from the given FILE, # based on the indices assigned to the "indices" variable. # If you change the labels, remember to update the from_generator() # parameters below, to reflect their datatype. def gen(): with lp.connect(FILE, 'r') as ds: if ae: for i in indices: yield {model_input_name: ds[:, i]}, ds[:, i] else: for i in indices: yield {model_input_name: ds[:, i]}, ds.ca.x_CellType[i] # Get the indices for train, test and eval sets train_idx, test_idx, eval_idx = train_test_set_idx_split(TRAIN_RT, TEST_RT, EVAL_RT) # Check condition and assign the respective set to the "indices" variable if mode == 'TRAIN': indices = train_idx elif mode == 'TEST': indices = test_idx elif mode == 'EVAL': indices = eval_idx else: print("Wrong mode choice: ", mode) exit(1) dataset = tf.data.Dataset.from_generator(gen, ({model_input_name: tf.int64}, tf.int64), output_shapes=({model_input_name: [input_size,]}, [])) # Shuffle, batch, map, prefetch and repeat your dataset. # If you need to do some preprocessing on the data, create your function on # the cell above, and call it within a map() function. dataset = dataset.shuffle(buffer_size=batch_size*50) dataset = dataset.batch(batch_size) dataset = dataset.map(_reshape_labels) dataset = dataset.map(_int2float) # Map on whatever other functions you need dataset = dataset.map( ... ) dataset = dataset.prefetch(2) dataset = dataset.repeat(epochs) iterator = dataset.make_one_shot_iterator() return iterator.get_next() # Get train, test, eval indices for the given dataset def train_test_set_idx_split(train_rt, test_rt, eval_rt): """ This function returns indices for the train, test and evaluation sets, given an input Dataset. Arguments: train_rt: ratio of the train dataset test_rt: ratio of the test dataset eval_rt: ratio of the evaluation dataset Returns: train_idx: indices (of the given dataset) for the train dataset test_idx: indices (of the given dataset) for the test dataset evel_idx: indices (of the given dataset) for the evaluation dataset Note: This function will work correctly as long as (test_rt == evel_rt) is True. If you need (test_rt != evel_rt), you need something more sophisticated. """ with lp.connect(FILE, 'r') as ds: idx = np.array(range(0, ds.shape[1])) train_idx, test_idx = train_test_split(idx, train_size=train_rt, test_size=test_rt+eval_rt) test_idx, eval_idx = train_test_split(test_idx, train_size=0.5, test_size=0.5) return train_idx, test_idx, eval_idx # Reshape labels as needed def _reshape_labels(data, labels): return data, tf.reshape(labels, (-1,1)) 

Comments

0

The gap between dask and machine learning frameworks like tensorflow is actually a distributed in-memory cache.

We can see that dask is a very good choice for the preprocessing part, but how to transfer the preprocessed data to tensorflow is what we are worrying about.

Vineyard(v6d.io) solves the intermediate data sharing problem by providing integrations with data engines like dask and ml engines like tf and pytorch. Here is an example (https://v6d.io/examples/distributed-learning.html) showing how to transfer the preprocessed data from dask to horovod.keras, hope it helps.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.