Skip to content
Merged
2 changes: 1 addition & 1 deletion bigframes/bigquery/_operations/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def json_set(
>>> s = bpd.read_gbq("SELECT JSON '{\\\"a\\\": 1}' AS data")["data"]
>>> bbq.json_set(s, json_path_value_pairs=[("$.a", 100), ("$.b", "hi")])
0 {"a":100,"b":"hi"}
Name: data, dtype: string
Name: data, dtype: large_string[pyarrow]

Args:
input (bigframes.series.Series):
Expand Down
2 changes: 1 addition & 1 deletion bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def from_table(
raise ValueError("must set at most one of 'offests', 'primary_key'")
if any(i.field_type == "JSON" for i in table.schema if i.name in schema.names):
warnings.warn(
"Interpreting JSON column(s) as StringDtype. This behavior may change in future versions.",
"Interpreting JSON column(s) as StringDtype and pyarrow.large_string. This behavior may change in future versions.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we just say "... as pyarrow.large_string"? StringDtype here is not relevant anymore, no?

bigframes.exceptions.PreviewWarning,
)
# define data source only for needed columns, this makes row-hashing cheaper
Expand Down
4 changes: 2 additions & 2 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2564,13 +2564,13 @@ def _get_rows_as_json_values(self) -> Block:
),
T1 AS (
SELECT *,
JSON_OBJECT(
TO_JSON_STRING(JSON_OBJECT(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need an explicit TO_JSON_STRING? Doesn't the change in ibis_types.py::ibis_dtype_to_bigframes_dtype take care of this post read?

 if isinstance(ibis_dtype, ibis_dtypes.JSON): ... return bigframes.dtypes.JSON_DTYPE 
"names", [{column_names_csv}],
"types", [{column_types_csv}],
"values", [{column_references_csv}],
"indexlength", {index_columns_count},
"dtype", {pandas_row_dtype}
) AS {googlesql.identifier(row_json_column_name)} FROM T0
)) AS {googlesql.identifier(row_json_column_name)} FROM T0
)
SELECT {select_columns_csv} FROM T1
"""
Expand Down
4 changes: 2 additions & 2 deletions bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ def _set_or_replace_by_id(
builder.columns = [*self.columns, new_value.name(id)]
return builder.build()

def _select(self, values: typing.Tuple[ibis_types.Value]) -> UnorderedIR:
def _select(self, values: typing.Tuple[ibis_types.Value, ...]) -> UnorderedIR:
builder = self.builder()
builder.columns = values
return builder.build()
Expand Down Expand Up @@ -1129,7 +1129,7 @@ def _set_or_replace_by_id(self, id: str, new_value: ibis_types.Value) -> Ordered
builder.columns = [*self.columns, new_value.name(id)]
return builder.build()

def _select(self, values: typing.Tuple[ibis_types.Value]) -> OrderedIR:
def _select(self, values: typing.Tuple[ibis_types.Value, ...]) -> OrderedIR:
"""Safely assign by id while maintaining ordering integrity."""
# TODO: Split into explicit set and replace methods
ordering_col_ids = set(
Expand Down
8 changes: 4 additions & 4 deletions bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ def compile_sql(
node = self.set_output_names(node, output_ids)
if ordered:
node, limit = rewrites.pullup_limit_from_slice(node)
return self.compile_ordered_ir(self._preprocess(node)).to_sql(
ordered=True, limit=limit
)
ir = self.compile_ordered_ir(self._preprocess(node))
return ir.to_sql(ordered=True, limit=limit)
else:
return self.compile_unordered_ir(self._preprocess(node)).to_sql()
ir = self.compile_unordered_ir(self._preprocess(node)) # type: ignore
return ir.to_sql()

def compile_peek_sql(self, node: nodes.BigFrameNode, n_rows: int) -> str:
return self.compile_unordered_ir(self._preprocess(node)).peek_sql(n_rows)
Expand Down
11 changes: 3 additions & 8 deletions bigframes/core/compile/ibis_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from bigframes_vendored.ibis.expr.datatypes.core import (
dtype as python_type_to_bigquery_type,
)
import bigframes_vendored.ibis.expr.operations as ibis_ops
import bigframes_vendored.ibis.expr.types as ibis_types
import geopandas as gpd # type: ignore
import google.cloud.bigquery as bigquery
Expand All @@ -46,6 +45,7 @@
ibis_dtypes.Binary,
ibis_dtypes.Decimal,
ibis_dtypes.GeoSpatial,
ibis_dtypes.JSON,
]


Expand Down Expand Up @@ -74,6 +74,7 @@
ibis_dtypes.GeoSpatial(geotype="geography", srid=4326, nullable=True),
gpd.array.GeometryDtype(),
),
(ibis_dtypes.json, pd.ArrowDtype(pa.large_string())),
)

BIGFRAMES_TO_IBIS: Dict[bigframes.dtypes.Dtype, ibis_dtypes.DataType] = {
Expand Down Expand Up @@ -219,12 +220,6 @@ def ibis_value_to_canonical_type(value: ibis_types.Value) -> ibis_types.Value:
"""
ibis_type = value.type()
name = value.get_name()
if ibis_type.is_json():
value = ibis_ops.ToJsonString(value).to_expr() # type: ignore
value = (
value.case().when("null", bigframes_vendored.ibis.null()).else_(value).end()
)
return value.name(name)
# Allow REQUIRED fields to be joined with NULLABLE fields.
nullable_type = ibis_type.copy(nullable=True)
return value.cast(nullable_type).name(name)
Expand Down Expand Up @@ -314,7 +309,7 @@ def ibis_dtype_to_bigframes_dtype(
"Interpreting JSON as string. This behavior may change in future versions.",
bigframes.exceptions.PreviewWarning,
)
return bigframes.dtypes.STRING_DTYPE
return bigframes.dtypes.JSON_DTYPE

if ibis_dtype in IBIS_TO_BIGFRAMES:
return IBIS_TO_BIGFRAMES[ibis_dtype]
Expand Down
16 changes: 13 additions & 3 deletions bigframes/core/compile/scalar_op_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1181,7 +1181,10 @@ def json_set_op_impl(x: ibis_types.Value, y: ibis_types.Value, op: ops.JSONSet):

@scalar_op_compiler.register_unary_op(ops.JSONExtract, pass_op=True)
def json_extract_op_impl(x: ibis_types.Value, op: ops.JSONExtract):
return json_extract(json_obj=x, json_path=op.json_path)
if x.type().is_json():
return json_extract(json_obj=x, json_path=op.json_path)
# json string
return json_extract_string(json_obj=x, json_path=op.json_path)


@scalar_op_compiler.register_unary_op(ops.JSONExtractArray, pass_op=True)
Expand Down Expand Up @@ -1845,7 +1848,7 @@ def float_ceil(a: float) -> float:


@ibis_udf.scalar.builtin(name="parse_json")
def parse_json(a: str) -> ibis_dtypes.JSON: # type: ignore[empty-body]
def parse_json(json_str: str) -> ibis_dtypes.JSON: # type: ignore[empty-body]
"""Converts a JSON-formatted STRING value to a JSON value."""


Expand All @@ -1860,7 +1863,14 @@ def json_set( # type: ignore[empty-body]
def json_extract( # type: ignore[empty-body]
json_obj: ibis_dtypes.JSON, json_path: ibis_dtypes.String
) -> ibis_dtypes.JSON:
"""Extracts a JSON value and converts it to a SQL JSON-formatted STRING or JSON value."""
"""Extracts a JSON value and converts it to a JSON value."""


@ibis_udf.scalar.builtin(name="json_extract")
def json_extract_string( # type: ignore[empty-body]
json_obj: ibis_dtypes.String, json_path: ibis_dtypes.String
) -> ibis_dtypes.String:
"""Extracts a JSON SRING value and converts it to a SQL JSON-formatted STRING."""


@ibis_udf.scalar.builtin(name="json_extract_array")
Expand Down
28 changes: 16 additions & 12 deletions bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
BIGNUMERIC_DTYPE = pd.ArrowDtype(pa.decimal256(76, 38))
# No arrow equivalent
GEO_DTYPE = gpd.array.GeometryDtype()
# JSON
JSON_DTYPE = pd.ArrowDtype(pa.large_string())

# Used when storing Null expressions
DEFAULT_DTYPE = FLOAT_DTYPE
Expand Down Expand Up @@ -132,6 +134,13 @@ class SimpleDtypeInfo:
orderable=True,
clusterable=True,
),
SimpleDtypeInfo(
dtype=JSON_DTYPE,
arrow_dtype=pa.large_string(),
type_kind=("JSON",),
orderable=False,
clusterable=False,
),
SimpleDtypeInfo(
dtype=DATE_DTYPE,
arrow_dtype=pa.date32(),
Expand Down Expand Up @@ -281,7 +290,7 @@ def is_struct_like(type_: ExpressionType) -> bool:

def is_json_like(type_: ExpressionType) -> bool:
# TODO: Add JSON type support
return type_ == STRING_DTYPE
return type_ == JSON_DTYPE or type_ == STRING_DTYPE # Including JSON string


def is_json_encoding_type(type_: ExpressionType) -> bool:
Expand Down Expand Up @@ -455,8 +464,6 @@ def infer_literal_arrow_type(literal) -> typing.Optional[pa.DataType]:
return bigframes_dtype_to_arrow_dtype(infer_literal_type(literal))


# Don't have dtype for json, so just end up interpreting as STRING
_REMAPPED_TYPEKINDS = {"JSON": "STRING"}
_TK_TO_BIGFRAMES = {
type_kind: mapping.dtype
for mapping in SIMPLE_TYPES
Expand All @@ -480,16 +487,13 @@ def convert_schema_field(
pa_struct = pa.struct(fields)
pa_type = pa.list_(pa_struct) if is_repeated else pa_struct
return field.name, pd.ArrowDtype(pa_type)
elif (
field.field_type in _TK_TO_BIGFRAMES or field.field_type in _REMAPPED_TYPEKINDS
):
singular_type = _TK_TO_BIGFRAMES[
_REMAPPED_TYPEKINDS.get(field.field_type, field.field_type)
]
elif field.field_type in _TK_TO_BIGFRAMES:
if is_repeated:
pa_type = pa.list_(bigframes_dtype_to_arrow_dtype(singular_type))
pa_type = pa.list_(
bigframes_dtype_to_arrow_dtype(_TK_TO_BIGFRAMES[field.field_type])
)
return field.name, pd.ArrowDtype(pa_type)
return field.name, singular_type
return field.name, _TK_TO_BIGFRAMES[field.field_type]
else:
raise ValueError(f"Cannot handle type: {field.field_type}")

Expand Down Expand Up @@ -639,7 +643,7 @@ def can_coerce(source_type: ExpressionType, target_type: ExpressionType) -> bool
return True # None can be coerced to any supported type
else:
return (source_type == STRING_DTYPE) and (
target_type in TEMPORAL_BIGFRAMES_TYPES
target_type in TEMPORAL_BIGFRAMES_TYPES + [JSON_DTYPE]
)


Expand Down
6 changes: 4 additions & 2 deletions bigframes/session/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ def peek(
def head(
self, array_value: bigframes.core.ArrayValue, n_rows: int
) -> ExecuteResult:

maybe_row_count = self._local_get_row_count(array_value)
if (maybe_row_count is not None) and (maybe_row_count <= n_rows):
return self.execute(array_value, ordered=True)
Expand Down Expand Up @@ -452,7 +453,7 @@ def cached(
# use a heuristic for whether something needs to be cached
if (not force) and self._is_trivially_executable(array_value):
return
elif use_session:
if use_session:
self._cache_with_session_awareness(array_value)
else:
self._cache_with_cluster_cols(array_value, cluster_cols=cluster_cols)
Expand Down Expand Up @@ -656,7 +657,7 @@ def _sql_as_cached_temp_table(
def _validate_result_schema(
self,
array_value: bigframes.core.ArrayValue,
bq_schema: list[bigquery.schema.SchemaField],
bq_schema: list[bigquery.SchemaField],
):
actual_schema = tuple(bq_schema)
ibis_schema = bigframes.core.compile.test_only_ibis_inferred_schema(
Expand All @@ -665,6 +666,7 @@ def _validate_result_schema(
internal_schema = array_value.schema
if not bigframes.features.PANDAS_VERSIONS.is_arrow_list_dtype_usable:
return

if internal_schema.to_bigquery() != actual_schema:
raise ValueError(
f"This error should only occur while testing. BigFrames internal schema: {internal_schema.to_bigquery()} does not match actual schema: {actual_schema}"
Expand Down
9 changes: 9 additions & 0 deletions tests/system/small/bigquery/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def test_json_extract_from_string():
actual.to_pandas(),
expected.to_pandas(),
check_names=False,
check_dtype=False, # json_extract returns string type. While _get_series_from_json gives a JSON series (pa.large_string).
)


Expand Down Expand Up @@ -200,3 +201,11 @@ def test_json_extract_string_array_as_float_array_from_array_strings():
def test_json_extract_string_array_w_invalid_series_type():
with pytest.raises(TypeError):
bbq.json_extract_string_array(bpd.Series([1, 2]))


# b/381148539
def test_json_in_struct():
df = bpd.read_gbq(
"SELECT STRUCT(JSON '{\\\"a\\\": 1}' AS data, 1 AS number) as struct_col"
)
assert df["struct_col"].struct.field("data")[0] == '{"a":1}'
2 changes: 1 addition & 1 deletion tests/system/small/test_dataframe_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ def test_load_json(session):
{
"json_column": ['{"bar":true,"foo":10}'],
},
dtype=pd.StringDtype(storage="pyarrow"),
dtype=pd.ArrowDtype(pa.large_string()),
)
expected.index = expected.index.astype("Int64")
pd.testing.assert_series_equal(result.dtypes, expected.dtypes)
Expand Down
2 changes: 1 addition & 1 deletion tests/system/small/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ def test_get_column(scalars_dfs, col_name, expected_dtype):
def test_get_column_w_json(json_df, json_pandas_df):
series = json_df["json_col"]
series_pandas = series.to_pandas()
assert series.dtype == pd.StringDtype(storage="pyarrow")
assert series.dtype == pd.ArrowDtype(pa.large_string())
assert series_pandas.shape[0] == json_pandas_df.shape[0]


Expand Down
Loading