Skip to content
This repository was archived by the owner on May 6, 2025. It is now read-only.

google/space

Space: Unified Storage for Machine Learning

Python CI


Unify data in your entire machine learning lifecycle with Space, a comprehensive storage solution that seamlessly handles data from ingestion to training.

Key Features:

  • Ground Truth Database
    • Store and manage multimodal data in open source file formats, row or columnar, local or in cloud.
    • Ingest from various sources, including ML datasets, files, and labeling tools.
    • Support data manipulation (append, insert, update, delete) and version control.
  • OLAP Database and Lakehouse
  • Distributed Data Processing Pipelines
    • Integrate with processing frameworks like Ray for efficient data transformation.
    • Store processed results as Materialized Views (MVs); incrementally update MVs when the source is changed.
  • Seamless Training Framework Integration
    • Access Space datasets and MVs directly via random access interfaces.
    • Convert to popular ML dataset formats (e.g., TFDS, HuggingFace, Ray).

Onboarding Examples

Space 101

  • Space uses Arrow in the API surface, e.g., schema, filter, data IO.
  • All file paths in Space are relative; datasets are immediately usable after downloading or moving.
  • Space stores data itself, or a reference of data, in Parquet files. The reference can be the address of a row in ArrayRecord file, or the path of a standalone file (limitted support, see space.core.schema.types.files).
  • space.TfFeatures is a built-in field type providing serializers for nested dicts of numpy arrays, based on TFDS FeaturesDict.
  • Please find more information in the design and performance docs.

Quick Start

Install

Install:

pip install space-datasets

Or install from code:

cd python pip install .[dev]

Cluster Setup and Performance Tuning

See the setup and performance doc.

Create and Load Datasets

Create a Space dataset with two index fields (id, image_name) (store in Parquet) and a record field (feature) (store in ArrayRecord).

This example uses the plain binary type for the record field. Space supports a type space.TfFeatures that integrates with the TFDS feature serializer. See more details in a TFDS example.

import pyarrow as pa from space import Dataset schema = pa.schema([ ("id", pa.int64()), ("image_name", pa.string()), ("feature", pa.binary())]) ds = Dataset.create( "/path/to/<mybucket>/example_ds", schema, primary_keys=["id"], record_fields=["feature"]) # Store this field in ArrayRecord files # Load the dataset from files later: ds = Dataset.load("/path/to/<mybucket>/example_ds")

Optionally, you can use catalogs to manage datasets by names instead of locations:

from space import DirCatalog # DirCatalog manages datasets in a directory. catalog = DirCatalog("/path/to/<mybucket>") # Same as the creation above. ds = catalog.create_dataset("example_ds", schema, primary_keys=["id"], record_fields=["feature"]) # Same as the load above. ds = catalog.dataset("example_ds") # List all datasets and materialized views. print(catalog.datasets())

Write and Read

Append, delete some data. Each mutation generates a new version of data, represented by an increasing integer ID. Users can add tags to version IDs as alias.

import pyarrow.compute as pc from space import RayOptions # Create a local runner: runner = ds.local() # Or create a Ray runner: runner = ds.ray(ray_options=RayOptions(max_parallelism=8)) # To avoid https://github.com/ray-project/ray/issues/41333, wrap the runner  # with @ray.remote when running in a remote Ray cluster. # # @ray.remote # def run(): # return runner.read_all() # # Appending data generates a new dataset version `snapshot_id=1` # Write methods: # - append(...): no primary key check. # - insert(...): fail if primary key exists. # - upsert(...): overwrite if primary key exists. ids = range(100) runner.append({ "id": ids, "image_name": [f"{i}.jpg" for i in ids], "feature": [f"somedata{i}".encode("utf-8") for i in ids] }) ds.add_tag("after_append") # Version management: add tag to snapshot # Deletion generates a new version `snapshot_id=2` runner.delete(pc.field("id") == 1) ds.add_tag("after_delete") # Show all versions ds.versions().to_pandas() # >>> # snapshot_id create_time tag_or_branch # 0 2 2024-01-12 20:23:57+00:00 after_delete # 1 1 2024-01-12 20:23:38+00:00 after_append # 2 0 2024-01-12 20:22:51+00:00 None # Read options: # - filter_: optional, apply a filter (push down to reader). # - fields: optional, field selection. # - version: optional, snapshot_id or tag, time travel back to an old version. # - batch_size: optional, output size. runner.read_all( filter_=pc.field("image_name")=="2.jpg", fields=["feature"], version="after_add" # or snapshot ID `1` ) # Read the changes between version 0 and 2. for change in runner.diff(0, "after_delete"): print(change.change_type) print(change.data) print("===============")

Create a new branch and make changes in the new branch:

# The default branch is "main" ds.add_branch("dev") ds.set_current_branch("dev") # Make changes in the new branch, the main branch is not updated. # Switch back to the main branch. ds.set_current_branch("main")

Transform and Materialized Views

Space supports transforming a dataset to a view, and materializing the view to files. The transforms include:

  • Mapping batches using a user defined function (UDF).
  • Filter using a UDF.
  • Joining two views/datasets.

When the source dataset is modified, refreshing the materialized view incrementally synchronizes changes, which saves compute and IO cost. See more details in a Segment Anything example. Reading or refreshing views must be the Ray runner, because they are implemented based on Ray transform.

A materialized view mv can be used as a view mv.view or a dataset mv.dataset. The former always reads data from the source dataset's files and processes all data on-the-fly. The latter directly reads processed data from the MV's files, skips processing data.

Example of map_batches

# A sample transform UDF. # Input is {"field_name": [values, ...], ...} def modify_feature_udf(batch): batch["feature"] = [d + b"123" for d in batch["feature"]] return batch # Create a view and materialize it. view = ds.map_batches( fn=modify_feature_udf, output_schema=ds.schema, output_record_fields=["feature"] ) view_runner = view.ray() # Reading a view will read the source dataset and apply transforms on it. # It processes all data using `modify_feature_udf` on the fly. for d in view_runner.read(): print(d) mv = view.materialize("/path/to/<mybucket>/example_mv") # Or use a catalog: # mv = catalog.materialize("example_mv", view) mv_runner = mv.ray() # Refresh the MV up to version tag `after_add` of the source. mv_runner.refresh("after_add", batch_size=64) # Reading batch size # Or, mv_runner.refresh() refresh to the latest version # Use the MV runner instead of view runner to directly read from materialized # view files, no data processing any more. mv_runner.read_all()

Example of join

See a full example in the Segment Anything example. Creating a materialized view of join result is not supported yet.

# If input is a materialized view, using `mv.dataset` instead of `mv.view` # Only support 1 join key, it must be primary key of both left and right. joined_view = mv_left.dataset.join(mv_right.dataset, keys=["id"])

ML Frameworks Integration

There are several ways to integrate Space storage with ML frameworks. Space provides a random access data source for reading data in ArrayRecord files:

from space import RandomAccessDataSource datasource = RandomAccessDataSource( # <field-name>: <storage-location>, for reading data from ArrayRecord files. { "feature": "/path/to/<mybucket>/example_mv", }, # Don't auto deserialize data, because we store them as plain bytes. deserialize=False) len(datasource) datasource[2]

A dataset or view can also be read as a Ray dataset:

ray_ds = ds.ray_dataset() ray_ds.take(2)

Data in Parquet files can be read as a HuggingFace dataset:

from datasets import load_dataset huggingface_ds = load_dataset("parquet", data_files={"train": ds.index_files()})

Inspect Metadata

List file path of all index (Parquet) files:

ds.index_files() # Or show more statistics information of Parquet files. ds.storage.index_manifest() # Accept filter and snapshot_id

Show statistics information of all ArrayRecord files:

ds.storage.record_manifest() # Accept filter and snapshot_id

Status

Space is a new project under active development.

🚧 Ongoing tasks:

  • Performance benchmark and improvement.

Disclaimer

This is not an officially supported Google product.

Releases

No releases published

Packages

 
 
 

Contributors