Unify data in your entire machine learning lifecycle with Space, a comprehensive storage solution that seamlessly handles data from ingestion to training.
Key Features:
- Ground Truth Database
- Store and manage multimodal data in open source file formats, row or columnar, local or in cloud.
- Ingest from various sources, including ML datasets, files, and labeling tools.
- Support data manipulation (append, insert, update, delete) and version control.
- OLAP Database and Lakehouse
- Iceberg style open table format.
- Optimized for unstructued data via reference operations.
- Quickly analyze data using SQL engines like DuckDB.
- Distributed Data Processing Pipelines
- Integrate with processing frameworks like Ray for efficient data transformation.
- Store processed results as Materialized Views (MVs); incrementally update MVs when the source is changed.
- Seamless Training Framework Integration
- Access Space datasets and MVs directly via random access interfaces.
- Convert to popular ML dataset formats (e.g., TFDS, HuggingFace, Ray).
- Manage Tensorflow COCO dataset
- Ground truth database of LabelStudio
- Transforms and materialized views: Segment Anything as example
- Incrementally build embedding vector indexes
- Parallel ingestion from WebDataset
- Convert from/to HuggingFace datasets
- Space uses Arrow in the API surface, e.g., schema, filter, data IO.
- All file paths in Space are relative; datasets are immediately usable after downloading or moving.
- Space stores data itself, or a reference of data, in Parquet files. The reference can be the address of a row in ArrayRecord file, or the path of a standalone file (limitted support, see
space.core.schema.types.files). space.TfFeaturesis a built-in field type providing serializers for nested dicts of numpy arrays, based on TFDS FeaturesDict.- Please find more information in the design and performance docs.
- Install
- Cluster Setup and Performance Tuning
- Create and Load Datasets
- Write and Read
- Transform and Materialized Views
- ML Frameworks Integration
- Inspect Metadata
Install:
pip install space-datasetsOr install from code:
cd python pip install .[dev]See the setup and performance doc.
Create a Space dataset with two index fields (id, image_name) (store in Parquet) and a record field (feature) (store in ArrayRecord).
This example uses the plain binary type for the record field. Space supports a type space.TfFeatures that integrates with the TFDS feature serializer. See more details in a TFDS example.
import pyarrow as pa from space import Dataset schema = pa.schema([ ("id", pa.int64()), ("image_name", pa.string()), ("feature", pa.binary())]) ds = Dataset.create( "/path/to/<mybucket>/example_ds", schema, primary_keys=["id"], record_fields=["feature"]) # Store this field in ArrayRecord files # Load the dataset from files later: ds = Dataset.load("/path/to/<mybucket>/example_ds")Optionally, you can use catalogs to manage datasets by names instead of locations:
from space import DirCatalog # DirCatalog manages datasets in a directory. catalog = DirCatalog("/path/to/<mybucket>") # Same as the creation above. ds = catalog.create_dataset("example_ds", schema, primary_keys=["id"], record_fields=["feature"]) # Same as the load above. ds = catalog.dataset("example_ds") # List all datasets and materialized views. print(catalog.datasets())Append, delete some data. Each mutation generates a new version of data, represented by an increasing integer ID. Users can add tags to version IDs as alias.
import pyarrow.compute as pc from space import RayOptions # Create a local runner: runner = ds.local() # Or create a Ray runner: runner = ds.ray(ray_options=RayOptions(max_parallelism=8)) # To avoid https://github.com/ray-project/ray/issues/41333, wrap the runner # with @ray.remote when running in a remote Ray cluster. # # @ray.remote # def run(): # return runner.read_all() # # Appending data generates a new dataset version `snapshot_id=1` # Write methods: # - append(...): no primary key check. # - insert(...): fail if primary key exists. # - upsert(...): overwrite if primary key exists. ids = range(100) runner.append({ "id": ids, "image_name": [f"{i}.jpg" for i in ids], "feature": [f"somedata{i}".encode("utf-8") for i in ids] }) ds.add_tag("after_append") # Version management: add tag to snapshot # Deletion generates a new version `snapshot_id=2` runner.delete(pc.field("id") == 1) ds.add_tag("after_delete") # Show all versions ds.versions().to_pandas() # >>> # snapshot_id create_time tag_or_branch # 0 2 2024-01-12 20:23:57+00:00 after_delete # 1 1 2024-01-12 20:23:38+00:00 after_append # 2 0 2024-01-12 20:22:51+00:00 None # Read options: # - filter_: optional, apply a filter (push down to reader). # - fields: optional, field selection. # - version: optional, snapshot_id or tag, time travel back to an old version. # - batch_size: optional, output size. runner.read_all( filter_=pc.field("image_name")=="2.jpg", fields=["feature"], version="after_add" # or snapshot ID `1` ) # Read the changes between version 0 and 2. for change in runner.diff(0, "after_delete"): print(change.change_type) print(change.data) print("===============")Create a new branch and make changes in the new branch:
# The default branch is "main" ds.add_branch("dev") ds.set_current_branch("dev") # Make changes in the new branch, the main branch is not updated. # Switch back to the main branch. ds.set_current_branch("main")Space supports transforming a dataset to a view, and materializing the view to files. The transforms include:
- Mapping batches using a user defined function (UDF).
- Filter using a UDF.
- Joining two views/datasets.
When the source dataset is modified, refreshing the materialized view incrementally synchronizes changes, which saves compute and IO cost. See more details in a Segment Anything example. Reading or refreshing views must be the Ray runner, because they are implemented based on Ray transform.
A materialized view mv can be used as a view mv.view or a dataset mv.dataset. The former always reads data from the source dataset's files and processes all data on-the-fly. The latter directly reads processed data from the MV's files, skips processing data.
# A sample transform UDF. # Input is {"field_name": [values, ...], ...} def modify_feature_udf(batch): batch["feature"] = [d + b"123" for d in batch["feature"]] return batch # Create a view and materialize it. view = ds.map_batches( fn=modify_feature_udf, output_schema=ds.schema, output_record_fields=["feature"] ) view_runner = view.ray() # Reading a view will read the source dataset and apply transforms on it. # It processes all data using `modify_feature_udf` on the fly. for d in view_runner.read(): print(d) mv = view.materialize("/path/to/<mybucket>/example_mv") # Or use a catalog: # mv = catalog.materialize("example_mv", view) mv_runner = mv.ray() # Refresh the MV up to version tag `after_add` of the source. mv_runner.refresh("after_add", batch_size=64) # Reading batch size # Or, mv_runner.refresh() refresh to the latest version # Use the MV runner instead of view runner to directly read from materialized # view files, no data processing any more. mv_runner.read_all()See a full example in the Segment Anything example. Creating a materialized view of join result is not supported yet.
# If input is a materialized view, using `mv.dataset` instead of `mv.view` # Only support 1 join key, it must be primary key of both left and right. joined_view = mv_left.dataset.join(mv_right.dataset, keys=["id"])There are several ways to integrate Space storage with ML frameworks. Space provides a random access data source for reading data in ArrayRecord files:
from space import RandomAccessDataSource datasource = RandomAccessDataSource( # <field-name>: <storage-location>, for reading data from ArrayRecord files. { "feature": "/path/to/<mybucket>/example_mv", }, # Don't auto deserialize data, because we store them as plain bytes. deserialize=False) len(datasource) datasource[2]A dataset or view can also be read as a Ray dataset:
ray_ds = ds.ray_dataset() ray_ds.take(2)Data in Parquet files can be read as a HuggingFace dataset:
from datasets import load_dataset huggingface_ds = load_dataset("parquet", data_files={"train": ds.index_files()})List file path of all index (Parquet) files:
ds.index_files() # Or show more statistics information of Parquet files. ds.storage.index_manifest() # Accept filter and snapshot_idShow statistics information of all ArrayRecord files:
ds.storage.record_manifest() # Accept filter and snapshot_idSpace is a new project under active development.
🚧 Ongoing tasks:
- Performance benchmark and improvement.
This is not an officially supported Google product.
