Skip to content
62 changes: 51 additions & 11 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,21 +144,56 @@ def mem_expr_from_pandas(
"""
Builds an in-memory only (SQL only) expr from a pandas dataframe.

Caution: If session is None, only a subset of expr functionality will be available (null Session is usually not supported).
Caution: If session is None, only a subset of expr functionality will
be available (null Session is usually not supported).
"""
# must set non-null column labels. these are not the user-facing labels
pd_df = pd_df.set_axis(
[column or bigframes.core.guid.generate_guid() for column in pd_df.columns],
axis="columns",
)
# We can't include any hidden columns in the ArrayValue constructor, so
# grab the column names before we add the hidden ordering column.
column_names = [str(column) for column in pd_df.columns]
# Make sure column names are all strings.
pd_df = pd_df.set_axis(column_names, axis="columns")
pd_df = pd_df.assign(**{ORDER_ID_COLUMN: range(len(pd_df))})

# ibis memtable cannot handle NA, must convert to None
pd_df = pd_df.astype("object") # type: ignore
pd_df = pd_df.where(pandas.notnull(pd_df), None)

# NULL type isn't valid in BigQuery, so retry with an explicit schema in these cases.
keys_memtable = ibis.memtable(pd_df)
schema = keys_memtable.schema()
new_schema = []
for column_index, column in enumerate(schema):
if column == ORDER_ID_COLUMN:
new_type: ibis_dtypes.DataType = ibis_dtypes.int64
else:
column_type = schema[column]
# The autodetected type might not be one we can support, such
# as NULL type for empty rows, so convert to a type we do
# support.
new_type = bigframes.dtypes.bigframes_dtype_to_ibis_dtype(
bigframes.dtypes.ibis_dtype_to_bigframes_dtype(column_type)
)
# TODO(swast): Ibis memtable doesn't use backticks in struct
# field names, so spaces and other characters aren't allowed in
# the memtable context. Blocked by
# https://github.com/ibis-project/ibis/issues/7187
column = f"col_{column_index}"
new_schema.append((column, new_type))

# must set non-null column labels. these are not the user-facing labels
pd_df = pd_df.set_axis(
[column for column, _ in new_schema],
axis="columns",
)
keys_memtable = ibis.memtable(pd_df, schema=ibis.schema(new_schema))

return cls(
session, # type: ignore # Session cannot normally be none, see "caution" above
keys_memtable,
columns=[
keys_memtable[f"col_{column_index}"].name(column)
for column_index, column in enumerate(column_names)
],
ordering=ExpressionOrdering(
ordering_value_columns=[OrderingColumnReference(ORDER_ID_COLUMN)],
total_ordering_columns=frozenset([ORDER_ID_COLUMN]),
Expand Down Expand Up @@ -426,11 +461,16 @@ def shape(self) -> typing.Tuple[int, int]:
width = len(self.columns)
count_expr = self._to_ibis_expr(ordering_mode="unordered").count()
sql = self._session.ibis_client.compile(count_expr)
row_iterator, _ = self._session._start_query(
sql=sql,
max_results=1,
)
length = next(row_iterator)[0]

# Support in-memory engines for hermetic unit tests.
if not isinstance(sql, str):
length = self._session.ibis_client.execute(count_expr)
else:
row_iterator, _ = self._session._start_query(
sql=sql,
max_results=1,
)
length = next(row_iterator)[0]
return (length, width)

def concat(self, other: typing.Sequence[ArrayValue]) -> ArrayValue:
Expand Down
55 changes: 27 additions & 28 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import bigframes.dtypes
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
import third_party.bigframes_vendored.pandas.io.common as vendored_pandas_io_common

# Type constraint for wherever column labels are used
Label = typing.Hashable
Expand Down Expand Up @@ -1522,37 +1523,35 @@ def _is_monotonic(
return result


def block_from_local(data, session=None, use_index=True) -> Block:
# TODO(tbergeron): Handle duplicate column labels
def block_from_local(data, session=None) -> Block:
pd_data = pd.DataFrame(data)
columns = pd_data.columns

column_labels = list(pd_data.columns)
if not all((label is None) or isinstance(label, str) for label in column_labels):
raise NotImplementedError(
f"Only string column labels supported. {constants.FEEDBACK_LINK}"
)
# Make a flattened version to treat as a table.
if len(pd_data.columns.names) > 1:
pd_data.columns = columns.to_flat_index()

if use_index:
if pd_data.index.nlevels > 1:
raise NotImplementedError(
f"multi-indices not supported. {constants.FEEDBACK_LINK}"
)
index_label = pd_data.index.name

index_id = guid.generate_guid()
pd_data = pd_data.reset_index(names=index_id)
keys_expr = core.ArrayValue.mem_expr_from_pandas(pd_data, session)
return Block(
keys_expr,
column_labels=column_labels,
index_columns=[index_id],
index_labels=[index_label],
)
else:
keys_expr = core.ArrayValue.mem_expr_from_pandas(pd_data, session)
keys_expr, offsets_id = keys_expr.promote_offsets()
# Constructor will create default range index
return Block(keys_expr, index_columns=[offsets_id], column_labels=column_labels)
index_labels = list(pd_data.index.names)
# The ArrayValue layer doesn't know about indexes, so make sure indexes
# are real columns with unique IDs.
pd_data = pd_data.reset_index(
names=[f"level_{level}" for level in range(len(index_labels))]
)
pd_data = pd_data.set_axis(
vendored_pandas_io_common.dedup_names(
list(pd_data.columns), is_potential_multiindex=False
),
axis="columns",
)
index_ids = pd_data.columns[: len(index_labels)]

keys_expr = core.ArrayValue.mem_expr_from_pandas(pd_data, session)
return Block(
keys_expr,
column_labels=columns,
index_columns=index_ids,
index_labels=index_labels,
)


def _align_block_to_schema(
Expand Down
4 changes: 1 addition & 3 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@

# BigQuery has 1 MB query size limit, 5000 items shouldn't take more than 10% of this depending on data type.
# TODO(tbergeron): Convert to bytes-based limit
# TODO(swast): Address issues with string escaping and empty tables before
# re-enabling inline data (ibis.memtable) feature.
MAX_INLINE_DF_SIZE = -1
MAX_INLINE_DF_SIZE = 5000

LevelType = typing.Union[str, int]
LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]]
Expand Down
21 changes: 13 additions & 8 deletions bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@
),
)

BIGFRAMES_TO_IBIS: Dict[Dtype, IbisDtype] = {
BIGFRAMES_TO_IBIS: Dict[Dtype, ibis_dtypes.DataType] = {
pandas: ibis for ibis, pandas in BIDIRECTIONAL_MAPPINGS
}

IBIS_TO_BIGFRAMES: Dict[
Union[IbisDtype, ReadOnlyIbisDtype], Union[Dtype, np.dtype[Any]]
] = {ibis: pandas for ibis, pandas in BIDIRECTIONAL_MAPPINGS}
IBIS_TO_BIGFRAMES: Dict[ibis_dtypes.DataType, Union[Dtype, np.dtype[Any]]] = {
ibis: pandas for ibis, pandas in BIDIRECTIONAL_MAPPINGS
}
# Allow REQUIRED fields to map correctly.
IBIS_TO_BIGFRAMES.update(
{ibis.copy(nullable=False): pandas for ibis, pandas in BIDIRECTIONAL_MAPPINGS}
Expand Down Expand Up @@ -130,7 +130,7 @@


def ibis_dtype_to_bigframes_dtype(
ibis_dtype: Union[IbisDtype, ReadOnlyIbisDtype]
ibis_dtype: ibis_dtypes.DataType,
) -> Union[Dtype, np.dtype[Any]]:
"""Converts an Ibis dtype to a BigQuery DataFrames dtype

Expand All @@ -155,6 +155,9 @@ def ibis_dtype_to_bigframes_dtype(

if ibis_dtype in IBIS_TO_BIGFRAMES:
return IBIS_TO_BIGFRAMES[ibis_dtype]
elif isinstance(ibis_dtype, ibis_dtypes.Null):
# Fallback to STRING for NULL values for most flexibility in SQL.
return IBIS_TO_BIGFRAMES[ibis_dtypes.string]
else:
raise ValueError(
f"Unexpected Ibis data type {ibis_dtype}. {constants.FEEDBACK_LINK}"
Expand Down Expand Up @@ -185,8 +188,8 @@ def ibis_table_to_canonical_types(table: ibis_types.Table) -> ibis_types.Table:


def bigframes_dtype_to_ibis_dtype(
bigframes_dtype: Union[DtypeString, Dtype]
) -> IbisDtype:
bigframes_dtype: Union[DtypeString, Dtype, np.dtype[Any]]
) -> ibis_dtypes.DataType:
"""Converts a BigQuery DataFrames supported dtype to an Ibis dtype.

Args:
Expand Down Expand Up @@ -281,7 +284,9 @@ def literal_to_ibis_scalar(
return scalar_expr


def cast_ibis_value(value: ibis_types.Value, to_type: IbisDtype) -> ibis_types.Value:
def cast_ibis_value(
value: ibis_types.Value, to_type: ibis_dtypes.DataType
) -> ibis_types.Value:
"""Perform compatible type casts of ibis values

Args:
Expand Down
4 changes: 1 addition & 3 deletions bigframes/operations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@

# BigQuery has 1 MB query size limit, 5000 items shouldn't take more than 10% of this depending on data type.
# TODO(tbergeron): Convert to bytes-based limit
# TODO(swast): Address issues with string escaping and empty tables before
# re-enabling inline data (ibis.memtable) feature.
MAX_INLINE_SERIES_SIZE = -1
MAX_INLINE_SERIES_SIZE = 5000


class SeriesMethods:
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@
"google-cloud-resource-manager >=1.10.3",
"google-cloud-storage >=2.0.0",
# TODO: Relax upper bound once we have fixed `system_prerelease` tests.
"ibis-framework[bigquery] >=6.0.0,<=6.1.0",
"ibis-framework[bigquery] >=6.2.0,<7.0.0dev",
"pandas >=1.5.0",
"pydata-google-auth >=1.8.2",
"requests >=2.27.1",
"scikit-learn >=1.2.2",
"sqlalchemy >=1.4,<3.0",
"sqlalchemy >=1.4,<3.0dev",
"ipywidgets >=7.7.1",
"humanize >= 4.6.0",
]
Expand Down
2 changes: 1 addition & 1 deletion testing/constraints-3.9.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ greenlet==2.0.2
grpc-google-iam-v1==0.12.6
grpcio==1.53.0
grpcio-status==1.48.2
ibis-framework==6.0.0
ibis-framework==6.2.0
humanize==4.6.0
identify==2.5.22
idna==3.4
Expand Down
13 changes: 13 additions & 0 deletions tests/unit/core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
85 changes: 85 additions & 0 deletions tests/unit/core/test_blocks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pandas
import pandas.testing
import pytest

import bigframes.core.blocks as blocks

from .. import resources


@pytest.mark.parametrize(
("data",),
(
pytest.param(
{"test 1": [1, 2, 3], "test 2": [0.25, 0.5, 0.75]},
id="dict_spaces_in_column_names",
),
pytest.param(
[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12]],
id="nested_list",
),
pytest.param(
pandas.concat(
[
pandas.Series([1, 2, 3], name="some col"),
pandas.Series([2, 3, 4], name="some col"),
],
axis="columns",
),
id="duplicate_column_names",
),
pytest.param(
pandas.DataFrame(
{"test": [1, 2, 3]},
index=pandas.Index(["a", "b", "c"], name="string index"),
),
id="string_index",
),
pytest.param(
pandas.DataFrame(
[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11]],
columns=pandas.MultiIndex.from_tuples(
[(1, 1), (1, 2), (0, 0), (0, 1)],
names=["some level", "another level"],
),
),
marks=[
pytest.mark.skipif(
tuple(pandas.__version__.split()) < ("2", "0", "0"),
reason="pandas 1.5.3 treats column MultiIndex as Index of tuples",
),
],
id="multiindex_columns",
),
pytest.param(
pandas.DataFrame(
{"test": [1, 2, 3]},
index=pandas.MultiIndex.from_tuples([(1, 1), (1, 2), (0, 0)]),
),
id="multiindex_rows",
),
),
)
def test_block_from_local(data):
expected = pandas.DataFrame(data)
session = resources.create_pandas_session({})

block = blocks.block_from_local(data, session=session)

pandas.testing.assert_index_equal(block.column_labels, expected.columns)
assert tuple(block.index_labels) == tuple(expected.index.names)
assert block.shape == expected.shape