Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions .buildkite/data.rayci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,6 @@ steps:
wanda: ci/docker/datatfxbsl.build.wanda.yaml

# tests
- label: ":database: data: arrow v9 tests"
tags:
- data
instance_type: medium
parallelism: 2
commands:
- bazel run //ci/ray_ci:test_in_docker -- //python/ray/data/... //python/ray/air/... data
--workers "$${BUILDKITE_PARALLEL_JOB_COUNT}"
--worker-id "$${BUILDKITE_PARALLEL_JOB}" --parallelism-per-worker 3
--build-name data9build
--except-tags data_integration,doctest,data_non_parallel
depends_on: data9build

- label: ":database: data: arrow v9 tests (data_non_parallel)"
tags:
- data
- data_non_parallel
instance_type: medium
commands:
- bazel run //ci/ray_ci:test_in_docker -- //python/ray/data/... //python/ray/air/... data
--build-name data9build
--only-tags data_non_parallel
depends_on: data9build

- label: ":database: data: arrow v19 tests"
tags:
- python
Expand Down
4 changes: 0 additions & 4 deletions python/ray/_private/arrow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@ def add_creatable_buckets_param_if_s3_uri(uri: str) -> str:
URI is an S3 URL; uri will be returned unchanged otherwise.
"""

pyarrow_version = get_pyarrow_version()
if pyarrow_version is not None and pyarrow_version < parse_version("9.0.0"):
# This bucket creation query parameter is not required for pyarrow < 9.0.0.
return uri
parsed_uri = urlparse(uri)
if parsed_uri.scheme == "s3":
uri = _add_url_query_params(uri, {"allow_bucket_creation": True})
Expand Down
5 changes: 0 additions & 5 deletions python/ray/air/tests/test_air_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,6 @@ def test_tag_storage_type(storage_path_filesystem_expected, mock_record, monkeyp

storage_path, storage_filesystem, expected = storage_path_filesystem_expected

if Version(pyarrow.__version__) < Version("9.0.0") and storage_path.startswith(
"gs://"
):
pytest.skip("GCS support requires pyarrow >= 9.0.0")

storage = StorageContext(
storage_path=storage_path,
experiment_dir_name="test",
Expand Down
22 changes: 0 additions & 22 deletions python/ray/air/tests/test_object_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,10 @@
from ray.air.util.object_extensions.arrow import (
ArrowPythonObjectArray,
ArrowPythonObjectType,
_object_extension_type_allowed,
)
from ray.air.util.object_extensions.pandas import PythonObjectArray


@pytest.mark.skipif(
not _object_extension_type_allowed(), reason="Object extension not supported."
)
def test_object_array_validation():
# Test unknown input type raises TypeError.
with pytest.raises(TypeError):
Expand All @@ -25,9 +21,6 @@ def test_object_array_validation():
PythonObjectArray([object(), object()])


@pytest.mark.skipif(
not _object_extension_type_allowed(), reason="Object extension not supported."
)
def test_arrow_scalar_object_array_roundtrip():
arr = np.array(
["test", 20, False, {"some": "value"}, None, np.zeros((10, 10))], dtype=object
Expand All @@ -41,38 +34,26 @@ def test_arrow_scalar_object_array_roundtrip():
assert np.all(out[-1] == arr[-1])


@pytest.mark.skipif(
not _object_extension_type_allowed(), reason="Object extension not supported."
)
def test_arrow_python_object_array_slice():
arr = np.array(["test", 20, "test2", 40, "test3", 60], dtype=object)
ata = ArrowPythonObjectArray.from_objects(arr)
assert list(ata[1:3].to_pandas()) == [20, "test2"]
assert ata[2:4].to_pylist() == ["test2", 40]


@pytest.mark.skipif(
not _object_extension_type_allowed(), reason="Object extension not supported."
)
def test_arrow_pandas_roundtrip():
obj = types.SimpleNamespace(a=1, b="test")
t1 = pa.table({"a": ArrowPythonObjectArray.from_objects([obj, obj]), "b": [0, 1]})
t2 = pa.Table.from_pandas(t1.to_pandas())
assert t1.equals(t2)


@pytest.mark.skipif(
not _object_extension_type_allowed(), reason="Object extension not supported."
)
def test_pandas_python_object_isna():
arr = np.array([1, np.nan, 3, 4, 5, np.nan, 7, 8, 9], dtype=object)
ta = PythonObjectArray(arr)
np.testing.assert_array_equal(ta.isna(), pd.isna(arr))


@pytest.mark.skipif(
not _object_extension_type_allowed(), reason="Object extension not supported."
)
def test_pandas_python_object_take():
arr = np.array([1, 2, 3, 4, 5], dtype=object)
ta = PythonObjectArray(arr)
Expand All @@ -85,9 +66,6 @@ def test_pandas_python_object_take():
)


@pytest.mark.skipif(
not _object_extension_type_allowed(), reason="Object extension not supported."
)
def test_pandas_python_object_concat():
arr1 = np.array([1, 2, 3, 4, 5], dtype=object)
arr2 = np.array([6, 7, 8, 9, 10], dtype=object)
Expand Down
62 changes: 8 additions & 54 deletions python/ray/air/tests/test_tensor_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,20 +517,8 @@ def test_arrow_tensor_array_getitem(chunked, restore_data_context, tensor_format
if chunked:
t_arr = pa.chunked_array(t_arr)

pyarrow_version = get_pyarrow_version()
if (
chunked
and pyarrow_version >= parse_version("8.0.0")
and pyarrow_version < parse_version("9.0.0")
):
for idx in range(outer_dim):
item = t_arr[idx]
assert isinstance(item, pa.ExtensionScalar)
item = item.type._extension_scalar_to_ndarray(item)
np.testing.assert_array_equal(item, arr[idx])
else:
for idx in range(outer_dim):
np.testing.assert_array_equal(t_arr[idx], arr[idx])
for idx in range(outer_dim):
np.testing.assert_array_equal(t_arr[idx], arr[idx])

# Test __iter__.
for t_subarr, subarr in zip(t_arr, arr):
Expand All @@ -554,19 +542,8 @@ def test_arrow_tensor_array_getitem(chunked, restore_data_context, tensor_format

np.testing.assert_array_equal(t_arr2_npy, arr[1:])

if (
chunked
and pyarrow_version >= parse_version("8.0.0")
and pyarrow_version < parse_version("9.0.0")
):
for idx in range(1, outer_dim):
item = t_arr2[idx - 1]
assert isinstance(item, pa.ExtensionScalar)
item = item.type._extension_scalar_to_ndarray(item)
np.testing.assert_array_equal(item, arr[idx])
else:
for idx in range(1, outer_dim):
np.testing.assert_array_equal(t_arr2[idx - 1], arr[idx])
for idx in range(1, outer_dim):
np.testing.assert_array_equal(t_arr2[idx - 1], arr[idx])


@pytest.mark.parametrize("tensor_format", ["v1", "v2"])
Expand All @@ -589,20 +566,8 @@ def test_arrow_variable_shaped_tensor_array_getitem(
if chunked:
t_arr = pa.chunked_array(t_arr)

pyarrow_version = get_pyarrow_version()
if (
chunked
and pyarrow_version >= parse_version("8.0.0")
and pyarrow_version < parse_version("9.0.0")
):
for idx in range(outer_dim):
item = t_arr[idx]
assert isinstance(item, pa.ExtensionScalar)
item = item.type._extension_scalar_to_ndarray(item)
np.testing.assert_array_equal(item, arr[idx])
else:
for idx in range(outer_dim):
np.testing.assert_array_equal(t_arr[idx], arr[idx])
for idx in range(outer_dim):
np.testing.assert_array_equal(t_arr[idx], arr[idx])

# Test __iter__.
for t_subarr, subarr in zip(t_arr, arr):
Expand All @@ -628,19 +593,8 @@ def test_arrow_variable_shaped_tensor_array_getitem(
for t_subarr, subarr in zip(t_arr2_npy, arr[1:]):
np.testing.assert_array_equal(t_subarr, subarr)

if (
chunked
and pyarrow_version >= parse_version("8.0.0")
and pyarrow_version < parse_version("9.0.0")
):
for idx in range(1, outer_dim):
item = t_arr2[idx - 1]
assert isinstance(item, pa.ExtensionScalar)
item = item.type._extension_scalar_to_ndarray(item)
np.testing.assert_array_equal(item, arr[idx])
else:
for idx in range(1, outer_dim):
np.testing.assert_array_equal(t_arr2[idx - 1], arr[idx])
for idx in range(1, outer_dim):
np.testing.assert_array_equal(t_arr2[idx - 1], arr[idx])


@pytest.mark.parametrize("tensor_format", ["v1", "v2"])
Expand Down
13 changes: 1 addition & 12 deletions python/ray/air/util/object_extensions/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,6 @@
from ray._private.arrow_utils import get_pyarrow_version
from ray.util.annotations import PublicAPI

MIN_PYARROW_VERSION_SCALAR_SUBCLASS = parse_version("9.0.0")

PYARROW_VERSION = get_pyarrow_version()


def _object_extension_type_allowed() -> bool:
return (
PYARROW_VERSION is not None
and PYARROW_VERSION >= MIN_PYARROW_VERSION_SCALAR_SUBCLASS
)


# Please see https://arrow.apache.org/docs/python/extending_types.html for more info
@PublicAPI(stability="alpha")
Expand Down Expand Up @@ -89,7 +78,7 @@ class ArrowPythonObjectArray(pa.ExtensionArray):
"""Array class for ArrowPythonObjectType"""

def from_objects(
objects: typing.Union[np.ndarray, typing.Iterable[typing.Any]]
objects: typing.Union[np.ndarray, typing.Iterable[typing.Any]],
) -> "ArrowPythonObjectArray":
if isinstance(objects, np.ndarray):
objects = objects.tolist()
Expand Down
41 changes: 15 additions & 26 deletions python/ray/air/util/tensor_extensions/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,40 +132,29 @@ def convert_to_pyarrow_array(

except ArrowConversionError as ace:
from ray.data import DataContext
from ray.data.extensions.object_extension import (
ArrowPythonObjectArray,
_object_extension_type_allowed,
)
from ray.data.extensions.object_extension import ArrowPythonObjectArray

enable_fallback_config: Optional[
bool
] = DataContext.get_current().enable_fallback_to_arrow_object_ext_type

if not _object_extension_type_allowed():
object_ext_type_fallback_allowed = False
# NOTE: By default setting is unset which (for compatibility reasons)
# is allowing the fallback
object_ext_type_fallback_allowed = (
enable_fallback_config is None or enable_fallback_config
)

if object_ext_type_fallback_allowed:
object_ext_type_detail = (
"skipping fallback to serialize as pickled python"
f" objects (due to unsupported Arrow version {PYARROW_VERSION}, "
f"min required version is {MIN_PYARROW_VERSION_SCALAR_SUBCLASS})"
"falling back to serialize as pickled python objects"
)
else:
# NOTE: By default setting is unset which (for compatibility reasons)
# is allowing the fallback
object_ext_type_fallback_allowed = (
enable_fallback_config is None or enable_fallback_config
object_ext_type_detail = (
"skipping fallback to serialize as pickled python objects "
"(due to DataContext.enable_fallback_to_arrow_object_ext_type "
"= False)"
)

if object_ext_type_fallback_allowed:
object_ext_type_detail = (
"falling back to serialize as pickled python objects"
)
else:
object_ext_type_detail = (
"skipping fallback to serialize as pickled python objects "
"(due to DataContext.enable_fallback_to_arrow_object_ext_type "
"= False)"
)

# To avoid logging following warning for every block it's
# only going to be logged in following cases
# - It's being logged for the first time, and
Expand Down Expand Up @@ -293,7 +282,7 @@ def _coerce_np_datetime_to_pa_timestamp_precision(


def _infer_pyarrow_type(
column_values: Union[List[Any], np.ndarray]
column_values: Union[List[Any], np.ndarray],
) -> Optional[pa.DataType]:
"""Infers target Pyarrow `DataType` based on the provided
columnar values.
Expand Down Expand Up @@ -375,7 +364,7 @@ def _len_gt_overflow_threshold(obj: Any) -> bool:


def _try_infer_pa_timestamp_type(
column_values: Union[List[Any], np.ndarray]
column_values: Union[List[Any], np.ndarray],
) -> Optional[pa.DataType]:
if isinstance(column_values, list) and len(column_values) > 0:
# In case provided column values is a list of elements, this
Expand Down
20 changes: 5 additions & 15 deletions python/ray/data/_internal/arrow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,21 +195,11 @@ def _build_tensor_row(
from packaging.version import parse as parse_version

element = row[col_name][0]
# TODO(Clark): Reduce this to np.asarray(element) once we only support Arrow
# 9.0.0+.
pyarrow_version = get_pyarrow_version()
if pyarrow_version is None or pyarrow_version >= parse_version("8.0.0"):
assert isinstance(element, pyarrow.ExtensionScalar)
if pyarrow_version is None or pyarrow_version >= parse_version("9.0.0"):
# For Arrow 9.0.0+, accessing an element in a chunked tensor array
# produces an ArrowTensorScalar, which we convert to an ndarray using
# .as_py().
element = element.as_py()
else:
# For Arrow 8.*, accessing an element in a chunked tensor array produces
# an ExtensionScalar, which we convert to an ndarray using our custom
# method.
element = element.type._extension_scalar_to_ndarray(element)
assert isinstance(element, pyarrow.ExtensionScalar)
# For Arrow 9.0.0+, accessing an element in a chunked tensor array
# produces an ArrowTensorScalar, which we convert to an ndarray using
# .as_py().
element = element.as_py()
# For Arrow < 8.0.0, accessing an element in a chunked tensor array produces an
# ndarray, which we return directly.
assert isinstance(element, np.ndarray), type(element)
Expand Down
2 changes: 0 additions & 2 deletions python/ray/data/extensions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
ArrowPythonObjectType,
PythonObjectArray,
PythonObjectDtype,
_object_extension_type_allowed,
)
from ray.data.extensions.tensor_extension import (
ArrowConversionError,
Expand Down Expand Up @@ -40,6 +39,5 @@
"ArrowPythonObjectScalar",
"PythonObjectArray",
"PythonObjectDtype",
"_object_extension_type_allowed",
"get_arrow_extension_tensor_types",
]
1 change: 0 additions & 1 deletion python/ray/data/extensions/object_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
ArrowPythonObjectArray,
ArrowPythonObjectScalar,
ArrowPythonObjectType,
_object_extension_type_allowed,
)
from ray.air.util.object_extensions.pandas import ( # noqa: F401
PythonObjectArray,
Expand Down
5 changes: 2 additions & 3 deletions python/ray/data/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,8 @@ def _s3_fs(aws_credentials, s3_server, s3_path):

kwargs = aws_credentials.copy()

if get_pyarrow_version() >= parse_version("9.0.0"):
kwargs["allow_bucket_creation"] = True
kwargs["allow_bucket_deletion"] = True
kwargs["allow_bucket_creation"] = True
kwargs["allow_bucket_deletion"] = True

fs = pa.fs.S3FileSystem(
region="us-west-2",
Expand Down
Loading