From ec319cccfe7fd4b67b414ed4ed4e9520149b7638 Mon Sep 17 00:00:00 2001 From: wjsi Date: Tue, 3 Oct 2023 17:20:10 +0800 Subject: [PATCH] Fix case --- mars/dataframe/base/bloom_filter.py | 2 +- mars/dataframe/reduction/aggregation.py | 41 +++++++++++++------ mars/dataframe/reduction/mode.py | 14 ++++--- mars/dataframe/reduction/nunique.py | 8 ++-- .../tests/test_reduction_execution.py | 35 ++++++++++++---- mars/deploy/oscar/session.py | 2 + mars/deploy/oscar/tests/session.py | 2 + mars/tests/test_session.py | 6 +-- 8 files changed, 76 insertions(+), 34 deletions(-) diff --git a/mars/dataframe/base/bloom_filter.py b/mars/dataframe/base/bloom_filter.py index 65b9a14591..ac3da3a750 100644 --- a/mars/dataframe/base/bloom_filter.py +++ b/mars/dataframe/base/bloom_filter.py @@ -175,7 +175,7 @@ def _build_dataframe_filter(cls, in_data: pd.DataFrame, op: "DataFrameBloomFilte def _convert_to_hashable_dtypes(cls, dtypes: pd.Series): dtypes = dict( (name, dtype) if np.issubdtype(dtype, int) else (name, str) - for name, dtype in dtypes.iteritems() + for name, dtype in dtypes.items() ) return dtypes diff --git a/mars/dataframe/reduction/aggregation.py b/mars/dataframe/reduction/aggregation.py index 34390f4457..32fbc94f73 100644 --- a/mars/dataframe/reduction/aggregation.py +++ b/mars/dataframe/reduction/aggregation.py @@ -139,6 +139,15 @@ def _calc_result_shape(self, df): self.output_types = [OutputType.scalar] return np.array(result_df).dtype, None, 0 + def _get_reduced_dim_unit(self, in_ndim, out_ndim): + """ + If rows can be reduced into multiple columns, return nan, + otherwise returns 1 + """ + if not isinstance(self.raw_func, str) and isinstance(self.raw_func, Iterable): + return 1 + return 1 if in_ndim != out_ndim else np.nan + def __call__(self, df, output_type=None, dtypes=None, index=None): self._output_types = df.op.output_types normalize_reduction_funcs(self, ndim=df.ndim) @@ -154,9 +163,7 @@ def __call__(self, df, output_type=None, dtypes=None, index=None): else: out_ndim = 0 - reduced_len = ( - 1 if df.ndim != out_ndim or isinstance(self.raw_func, list) else np.nan - ) + reduced_len = self._get_reduced_dim_unit(df.ndim, out_ndim) if self.output_types[0] == OutputType.dataframe: if self.axis == 0: new_shape = (len(index) * reduced_len, len(dtypes)) @@ -213,7 +220,7 @@ def _safe_append(d, key, val): @classmethod def _gen_map_chunks( cls, - op, + op: "DataFrameAggregate", in_df, out_df, func_infos: List[ReductionSteps], @@ -232,9 +239,7 @@ def _gen_map_chunks( agg_chunks = np.empty(agg_chunks_shape, dtype=object) dtypes_cache = dict() - reduced_len = ( - 1 if in_df.ndim != out_df.ndim or isinstance(op.raw_func, list) else np.nan - ) + reduced_len = op._get_reduced_dim_unit(in_df.ndim, out_df.ndim) for chunk in in_df.chunks: input_index = chunk.index[1 - axis] if len(chunk.index) > 1 else 0 if input_index not in input_index_to_output: @@ -438,9 +443,7 @@ def _tile_tree(cls, op: "DataFrameAggregate"): chunks = cls._gen_map_chunks( op, in_df, out_df, axis_func_infos, input_index_to_output ) - reduced_len = ( - 1 if in_df.ndim != out_df.ndim or isinstance(op.raw_func, list) else np.nan - ) + reduced_len = op._get_reduced_dim_unit(in_df.ndim, out_df.ndim) while chunks.shape[axis] > combine_size: if axis == 0: new_chunks_shape = ( @@ -715,6 +718,8 @@ def _do_predefined_agg(cls, op: "DataFrameAggregate", input_obj, func_name, kwds raise NotImplementedError("numeric_only not implemented under cudf") if isinstance(input_obj, pd.Index): kwds.pop("skipna", None) + if getattr(input_obj, "ndim", 0) > 1: + kwds["axis"] = op.axis return getattr(input_obj, func_name)(**kwds) @classmethod @@ -865,7 +870,19 @@ def _execute_agg(cls, ctx, op: "DataFrameAggregate"): ser_index = None if agg_series_ndim < out.ndim: ser_index = [func_name] - aggs.append(cls._wrap_df(op, agg_series, index=ser_index)) + if ( + isinstance(agg_series, np.ndarray) + and getattr(func_inputs[0], "ndim", 0) >= 1 + and hasattr(func_inputs[0], "index") + ): + agg_series = cls._wrap_df(op, agg_series, index=ser_index) + if op.axis == 0: + agg_series.columns = func_inputs[0].index + else: + agg_series.index = func_inputs[0].index + else: + agg_series = cls._wrap_df(op, agg_series, index=ser_index) + aggs.append(agg_series) # concatenate to produce final result concat_df = xdf.concat(aggs, axis=axis) @@ -931,7 +948,7 @@ def execute(cls, ctx, op: "DataFrameAggregate"): ): result = op.func[0](in_data) else: - result = in_data.agg(op.raw_func, axis=op.axis) + result = in_data.agg(op.raw_func, axis=op.axis, **op.raw_func_kw) if op.outputs[0].ndim == 1: result = result.astype(op.outputs[0].dtype, copy=False) diff --git a/mars/dataframe/reduction/mode.py b/mars/dataframe/reduction/mode.py index 9e4494fcd7..265a752a65 100644 --- a/mars/dataframe/reduction/mode.py +++ b/mars/dataframe/reduction/mode.py @@ -41,6 +41,12 @@ def __init__( self._numeric_only = numeric_only self._dropna = dropna + @staticmethod + def _explode_dict_series(s: pd.Series) -> pd.DataFrame: + exploded = s.apply(pd.Series) + # if exploded.columns.hasnans: + return exploded + def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ xdf = cudf if self.is_gpu() else pd if isinstance(in_data, xdf.Series): @@ -48,7 +54,7 @@ def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ else: if self._axis == 0: data = dict() - for d, v in in_data.iteritems(): + for d, v in in_data.items(): data[d] = [v.value_counts(dropna=self._dropna).to_dict()] df = xdf.DataFrame(data) else: @@ -64,7 +70,7 @@ def agg(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ else: if self._axis == 0: data = dict() - for d, v in in_data.iteritems(): + for d, v in in_data.items(): data[d] = [v.apply(pd.Series).sum().to_dict()] df = xdf.DataFrame(data) else: @@ -85,9 +91,7 @@ def _handle_series(s): if isinstance(in_data, xdf.Series): return _handle_series(in_data) else: - in_data_iter = ( - in_data.iteritems() if self._axis == 0 else in_data.iterrows() - ) + in_data_iter = in_data.items() if self._axis == 0 else in_data.iterrows() s_list = [] for d, v in in_data_iter: if isinstance(v.dtype, ArrowListDtype): diff --git a/mars/dataframe/reduction/nunique.py b/mars/dataframe/reduction/nunique.py index 3bcafd3db4..cac883bbec 100644 --- a/mars/dataframe/reduction/nunique.py +++ b/mars/dataframe/reduction/nunique.py @@ -59,7 +59,7 @@ def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ else: if self._axis == 0: data = dict() - for d, v in in_data.iteritems(): + for d, v in in_data.items(): if not self._use_arrow_dtype or xdf is cudf: data[d] = [v.drop_duplicates().to_list()] else: @@ -82,7 +82,7 @@ def agg(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ else: if self._axis == 0: data = dict() - for d, v in in_data.iteritems(): + for d, v in in_data.items(): if not self._use_arrow_dtype or xdf is cudf: data[d] = [v.explode().drop_duplicates().to_list()] else: @@ -103,9 +103,7 @@ def post(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ if isinstance(in_data, xdf.Series): return in_data.explode().nunique(dropna=self._dropna) else: - in_data_iter = ( - in_data.iteritems() if self._axis == 0 else in_data.iterrows() - ) + in_data_iter = in_data.items() if self._axis == 0 else in_data.iterrows() data = dict() for d, v in in_data_iter: if isinstance(v.dtype, ArrowListDtype): diff --git a/mars/dataframe/reduction/tests/test_reduction_execution.py b/mars/dataframe/reduction/tests/test_reduction_execution.py index 7e7d2c4459..2416d57970 100644 --- a/mars/dataframe/reduction/tests/test_reduction_execution.py +++ b/mars/dataframe/reduction/tests/test_reduction_execution.py @@ -36,6 +36,7 @@ cp = lazy_import("cupy", rename="cp") _agg_size_as_series = pd_release_version >= (1, 3) _support_kw_agg = pd_release_version >= (1, 1) +_drop_level_reduction = pd_release_version >= (2, 0) @pytest.fixture @@ -119,6 +120,9 @@ def compute(data, **kwargs): np.testing.assert_equal(r.execute().fetch(), compute(data)) +@pytest.mark.skipif( + _drop_level_reduction, reason="Level reduction not supported for pandas>=2.0" +) @pytest.mark.parametrize("func_name,func_opts", reduction_functions) def test_series_level_reduction(setup, func_name, func_opts: FunctionOptions): def compute(data, **kwargs): @@ -162,6 +166,9 @@ def compute(data, **kwargs): ) +@pytest.mark.skipif( + _drop_level_reduction, reason="Level reduction not supported for pandas>=2.0" +) @pytest.mark.parametrize("func_name,func_opts", reduction_functions) def test_dataframe_reduction( setup, check_ref_counts, func_name, func_opts: FunctionOptions @@ -255,6 +262,9 @@ def compute(data, **kwargs): ) +@pytest.mark.skipif( + _drop_level_reduction, reason="Level reduction not supported for pandas>=2.0" +) @pytest.mark.parametrize("func_name,func_opts", reduction_functions) def test_dataframe_level_reduction( setup, check_ref_counts, func_name, func_opts: FunctionOptions @@ -403,6 +413,9 @@ def compute(data, **kwargs): assert r.execute().fetch() is True +@pytest.mark.skipif( + _drop_level_reduction, reason="Level reduction not supported for pandas>=2.0" +) @pytest.mark.parametrize("func_name", bool_reduction_functions) def test_series_bool_level_reduction(setup, check_ref_counts, func_name): def compute(data, **kwargs): @@ -510,6 +523,9 @@ def compute(data, **kwargs): ) +@pytest.mark.skipif( + _drop_level_reduction, reason="Level reduction not supported for pandas>=2.0" +) @pytest.mark.parametrize("func_name", bool_reduction_functions) def test_dataframe_bool_level_reduction(setup, check_ref_counts, func_name): def compute(data, **kwargs): @@ -685,18 +701,20 @@ def test_mode(setup, check_ref_counts): config_kw = { "extra_config": { "check_shape": False, + "check_dtypes": False, + "check_columns_value": False, "check_index_value": False, } } data1 = pd.Series(np.random.randint(0, 5, size=(20,))) series = md.Series(data1) - result = series.mode().execute().fetch() + result = series.mode().execute(**config_kw).fetch(**config_kw) expected = data1.mode() pd.testing.assert_series_equal(result, expected) series = md.Series(data1, chunk_size=6) - result = series.mode().execute().fetch() + result = series.mode().execute(**config_kw).fetch(**config_kw) expected = data1.mode() pd.testing.assert_series_equal(result, expected) @@ -705,7 +723,7 @@ def test_mode(setup, check_ref_counts): data2[[2, 9, 18]] = np.nan series = md.Series(data2) - result = series.mode().execute().fetch() + result = series.mode().execute(**config_kw).fetch(**config_kw) expected = data2.mode() pd.testing.assert_series_equal(result, expected) @@ -720,7 +738,7 @@ def test_mode(setup, check_ref_counts): columns=["c" + str(i) for i in range(20)], ) df = md.DataFrame(data1) - result = df.mode().execute().fetch() + result = df.mode().execute(**config_kw).fetch(**config_kw) expected = data1.mode() pd.testing.assert_frame_equal(result, expected) @@ -730,7 +748,7 @@ def test_mode(setup, check_ref_counts): pd.testing.assert_frame_equal(result, expected) df = md.DataFrame(data1) - result = df.mode(axis=1).execute().fetch() + result = df.mode(axis=1).execute(**config_kw).fetch(**config_kw) expected = data1.mode(axis=1) pd.testing.assert_frame_equal(result, expected) @@ -744,7 +762,7 @@ def test_mode(setup, check_ref_counts): data2.iloc[[2, 9, 18], [2, 9, 18]] = np.nan df = md.DataFrame(data2) - result = df.mode().execute().fetch() + result = df.mode().execute(**config_kw).fetch(**config_kw) expected = data2.mode() pd.testing.assert_frame_equal(result, expected) @@ -1008,7 +1026,10 @@ def test_dataframe_aggregate(setup, check_ref_counts): mean_9=NamedAgg(9, "mean"), ) result = df.agg(**agg_kw) - pd.testing.assert_frame_equal(result.execute().fetch(), data.agg(**agg_kw)) + pd.testing.assert_frame_equal( + result.execute().fetch(extra_config={"check_shape": False}), + data.agg(**agg_kw), + ) def test_series_aggregate(setup, check_ref_counts): diff --git a/mars/deploy/oscar/session.py b/mars/deploy/oscar/session.py index 218bdce7ad..b5b0abfa8e 100644 --- a/mars/deploy/oscar/session.py +++ b/mars/deploy/oscar/session.py @@ -13,6 +13,7 @@ # limitations under the License. import asyncio +import copy import itertools import logging import json @@ -355,6 +356,7 @@ async def _run_in_background( async def execute(self, *tileables, **kwargs) -> ExecutionInfo: if self._closed: raise RuntimeError("Session closed already") + kwargs = copy.deepcopy(kwargs) fuse_enabled: bool = kwargs.pop("fuse_enabled", None) extra_config: dict = kwargs.pop("extra_config", None) warn_duplicated_execution: bool = kwargs.pop("warn_duplicated_execution", False) diff --git a/mars/deploy/oscar/tests/session.py b/mars/deploy/oscar/tests/session.py index a050d0003f..e46280f2a4 100644 --- a/mars/deploy/oscar/tests/session.py +++ b/mars/deploy/oscar/tests/session.py @@ -13,6 +13,7 @@ # limitations under the License. import asyncio +import copy import inspect import os import uuid @@ -62,6 +63,7 @@ def _process_result(self, tileable, result): return super()._process_result(tileable, result) async def fetch(self, *tileables, **kwargs): + kwargs = copy.deepcopy(kwargs) extra_config = kwargs.pop("extra_config", dict()) if kwargs: unexpected_keys = ", ".join(list(kwargs.keys())) diff --git a/mars/tests/test_session.py b/mars/tests/test_session.py index e4792aaea0..9fe0fa759e 100644 --- a/mars/tests/test_session.py +++ b/mars/tests/test_session.py @@ -301,7 +301,7 @@ def test_iter(setup): raw_data = pd.DataFrame(np.random.randint(1000, size=(20, 10))) df = md.DataFrame(raw_data, chunk_size=5) - for col, series in df.iteritems(): + for col, series in df.items(): pd.testing.assert_series_equal(series.execute().fetch(), raw_data[col]) for i, batch in enumerate(df.iterbatch(batch_size=15)): @@ -331,9 +331,7 @@ def test_iter(setup): pd.testing.assert_series_equal(batch, raw_data.iloc[i * 15 : (i + 1) * 15]) i = 0 - for result_item, expect_item in zip( - s.iteritems(batch_size=15), raw_data.iteritems() - ): + for result_item, expect_item in zip(s.items(batch_size=15), raw_data.items()): assert result_item[0] == expect_item[0] assert result_item[1] == expect_item[1] i += 1