Skip to content

Commit

Permalink
Resolve compatibility issue for kubernetes cases under CGroup2 (#3303)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi authored Feb 6, 2023
1 parent 9f0bf34 commit 37c08a6
Show file tree
Hide file tree
Showing 56 changed files with 204 additions and 151 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/benchmark-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
shell: bash
run: |
source ./ci/install-conda.sh
python -m pip install --upgrade pip "setuptools<64" wheel coverage;
python -m pip install --upgrade pip setuptools wheel coverage;
- name: Install dependencies
id: build
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/core-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
shell: bash
run: |
source ./ci/install-conda.sh
python -m pip install --upgrade pip "setuptools<64" wheel coverage;
python -m pip install --upgrade pip setuptools wheel coverage;
- name: Install dependencies
env:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/os-compat-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
shell: bash
run: |
source ./ci/install-conda.sh
python -m pip install --upgrade pip "setuptools<64" wheel coverage;
python -m pip install --upgrade pip setuptools wheel coverage;
- name: Install dependencies
env:
Expand Down
12 changes: 7 additions & 5 deletions .github/workflows/platform-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest]
python-version: [3.8-ray, 3.8-ray-deploy, 3.8-ray-dag, 3.8-vineyard, 3.8-dask]
python-version: [3.8-kubernetes, 3.8-ray, 3.8-ray-deploy, 3.8-ray-dag, 3.8-vineyard, 3.8-dask]
include:
- { os: ubuntu-20.04, python-version: 3.8-kubernetes, no-common-tests: 1,
- { os: ubuntu-latest, python-version: 3.8-kubernetes, no-common-tests: 1,
no-deploy: 1, with-kubernetes: "with Kubernetes" }
- { os: ubuntu-20.04, python-version: 3.8-hadoop, no-common-tests: 1,
no-deploy: 1, with-hadoop: "with hadoop" }
Expand All @@ -47,13 +47,14 @@ jobs:
shell: bash
run: |
source ./ci/install-conda.sh
python -m pip install --upgrade pip "setuptools<64" wheel coverage;
python -m pip install --upgrade pip setuptools wheel coverage
- name: Start minikube
if: ${{ matrix.with-kubernetes }}
with:
driver: none
uses: medyagh/setup-minikube@master
with:
driver: docker
mount-path: '/home/runner:/home/runner'

- name: Install dependencies
env:
Expand Down Expand Up @@ -125,6 +126,7 @@ jobs:
RUN_DASK: ${{ matrix.run-dask }}
NO_COMMON_TESTS: ${{ matrix.no-common-tests }}
NUMPY_EXPERIMENTAL_ARRAY_FUNCTION: 1
USE_MINIKUBE_DOCKER_ENV: true
CHANGE_MINIKUBE_NONE_USER: true
shell: bash
run: |
Expand Down
2 changes: 1 addition & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ jobs:
# do compatibility test for earliest supported pandas release
if [[ "$(mars.test.module)" == "dataframe" ]]; then
pip install numpy\<1.24.0
pip install numpy\<1.24.0 sqlalchemy\<2.0
pip install -i https://pkgs.dev.azure.com/mars-project/mars/_packaging/pandas/pypi/simple/ pandas==1.0.5
pytest $PYTEST_CONFIG -m pd_compat mars/dataframe
mv .coverage build/.coverage.pd_compat.file
Expand Down
2 changes: 0 additions & 2 deletions benchmarks/tpch/gen_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ def to_parquet(args):
def generate(
tables, SCALE_FACTOR, folder, upload_to_s3, validate_dataset, num_processes
):

if upload_to_s3:
assert "AWS_ACCESS_KEY_ID" in os.environ, "AWS credentials not set"
else:
Expand All @@ -117,7 +116,6 @@ def generate(
fs = s3fs.S3FileSystem()

for table_name, (table_short, num_pieces, load_func) in tables.items():

if upload_to_s3:
output_prefix = f"s3://{folder}/{table_name}.pq"
else:
Expand Down
4 changes: 3 additions & 1 deletion mars/core/entity/executable.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ def _thread_body(self):
fut.set_result(None)
except (RuntimeError, ConnectionError, KeyError, ActorNotExist):
fut.set_result(None)
except Exception as ex: # pragma: no cover # noqa: E722 # nosec # pylint: disable=bare-except
except (
Exception
) as ex: # pragma: no cover # noqa: E722 # nosec # pylint: disable=bare-except
fut.set_exception(ex)
finally:
del session
Expand Down
4 changes: 1 addition & 3 deletions mars/dataframe/base/shift.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,7 @@ def _tile_series(cls, op):
to_concats = [c]
left = abs(op.periods)
while left > 0 and 0 <= prev_i < inp.chunk_shape[0]:
prev_chunk = inp.cix[
prev_i,
]
prev_chunk = inp.cix[prev_i,]
size = min(left, prev_chunk.shape[0])
left -= size
prev_i = prev_i - 1 if inc else prev_i + 1
Expand Down
1 change: 0 additions & 1 deletion mars/dataframe/base/to_numeric.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@


class DataFrameToNumeric(DataFrameOperand, DataFrameOperandMixin):

errors = StringField("errors")
downcast = StringField("downcast")

Expand Down
4 changes: 1 addition & 3 deletions mars/dataframe/datasource/from_tensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,7 @@ def _tile_input_1d_tileables(cls, op: "DataFrameFromTensor"):
index_value = parse_index(pd_index, store_data=True)
else:
assert op.index is not None
index_chunk = in_tensors[-1].cix[
i,
]
index_chunk = in_tensors[-1].cix[i,]
index_value = parse_index(
pd.Index([], dtype=index_chunk.dtype),
index_chunk,
Expand Down
2 changes: 1 addition & 1 deletion mars/dataframe/datasource/read_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def _tile_partitioned(cls, op: "DataFrameReadParquet"):
out_df = op.outputs[0]
shape = (np.nan, out_df.shape[1])
dtypes = cls._to_arrow_dtypes(out_df.dtypes, op)
dataset = pq.ParquetDataset(op.path)
dataset = pq.ParquetDataset(op.path, use_legacy_dataset=True)

path_prefix = _parse_prefix(op.path)

Expand Down
37 changes: 22 additions & 15 deletions mars/dataframe/datasource/read_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@
from ...tensor.utils import normalize_chunk_sizes
from ...typing import OperandType, TileableType
from ..arrays import ArrowStringDtype
from ..utils import parse_index, create_sa_connection, to_arrow_dtypes
from ..utils import (
parse_index,
create_sa_connection,
to_arrow_dtypes,
patch_sa_engine_execute,
)
from .core import (
IncrementalIndexDatasource,
ColumnPruneSupportedDataSourceMixin,
Expand Down Expand Up @@ -127,7 +132,6 @@ def _get_selectable(self, engine_or_conn, columns=None):
selectable = sa.Table(
self.table_or_sql,
m,
autoload=True,
autoload_with=engine_or_conn,
schema=self.schema,
)
Expand All @@ -141,12 +145,10 @@ def _get_selectable(self, engine_or_conn, columns=None):
.alias(temp_name_2)
)
else:
selectable = sql.select(
"*",
from_obj=sql.text(
f"({self.table_or_sql}) AS {temp_name_1}"
),
).alias(temp_name_2)
from_tb = sql.text(f"({self.table_or_sql}) AS {temp_name_1}")
selectable = (
sql.select("*").select_from(from_tb).alias(temp_name_2)
)
self.selectable = selectable
return selectable

Expand All @@ -155,11 +157,15 @@ def _collect_info(self, engine_or_conn, selectable, columns, test_rows):

# fetch test DataFrame
if columns:
query = sql.select(
[sql.column(c) for c in columns], from_obj=selectable
).limit(test_rows)
query = (
sql.select(*[sql.column(c) for c in columns])
.select_from(selectable)
.limit(test_rows)
)
else:
query = sql.select(selectable.columns, from_obj=selectable).limit(test_rows)
query = (
sql.select(*selectable.columns).select_from(selectable).limit(test_rows)
)
test_df = pd.read_sql(
query,
engine_or_conn,
Expand All @@ -178,7 +184,7 @@ def _collect_info(self, engine_or_conn, selectable, columns, test_rows):
# fetch size
size = list(
engine_or_conn.execute(
sql.select([sql.func.count()]).select_from(selectable)
sql.select(sql.func.count()).select_from(selectable)
)
)[0][0]
shape = (size, test_df.shape[1])
Expand Down Expand Up @@ -352,7 +358,7 @@ def _tile_partition(cls, op: "DataFrameReadSQL"):
try:
part_col = selectable.columns[op.partition_col]
range_results = engine.execute(
sql.select([sql.func.min(part_col), sql.func.max(part_col)])
sql.select(sql.func.min(part_col), sql.func.max(part_col))
)

op.low_limit, op.high_limit = next(range_results)
Expand Down Expand Up @@ -429,6 +435,7 @@ def _adapt_datetime(dt):

out = op.outputs[0]

patch_sa_engine_execute()
engine = sa.create_engine(op.con, **(op.engine_kwargs or dict()))
try:
selectable = op._get_selectable(engine)
Expand All @@ -444,7 +451,7 @@ def _adapt_datetime(dt):
op.low_limit = _adapt_datetime(op.low_limit)
op.high_limit = _adapt_datetime(op.high_limit)

query = sa.sql.select(columns)
query = sa.sql.select(*columns)
if op.method == "partition":
part_col = selectable.columns[op.partition_col]
if op.left_end:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ def test_read_sql_execution(setup):
result = r.execute().fetch()
pd.testing.assert_frame_equal(result, expected)

table = sa.Table(table_name, m, autoload=True, autoload_with=engine)
table = sa.Table(table_name, m, autoload_with=engine)
r = md.read_sql_table(
table,
engine,
Expand Down
2 changes: 2 additions & 0 deletions mars/dataframe/datastore/tests/test_datastore_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from .... import dataframe as md
from ....tests.core import flaky
from ... import DataFrame
from ...utils import patch_sa_engine_execute


def test_to_csv_execution(setup):
Expand Down Expand Up @@ -130,6 +131,7 @@ def test_to_sql():
index=index,
)

patch_sa_engine_execute()
with tempfile.TemporaryDirectory() as d:
table_name1 = "test_table"
table_name2 = "test_table2"
Expand Down
2 changes: 2 additions & 0 deletions mars/dataframe/datastore/to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
build_empty_df,
build_empty_series,
create_sa_connection,
patch_sa_engine_execute,
)


Expand Down Expand Up @@ -171,6 +172,7 @@ def execute(cls, ctx, op: "DataFrameToSQLTable"):

import sqlalchemy as sa

patch_sa_engine_execute()
engine = sa.create_engine(op.con, **(op.engine_kwargs or dict()))

try:
Expand Down
5 changes: 1 addition & 4 deletions mars/dataframe/groupby/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,9 +475,7 @@ def _gen_map_chunks(
by = []
for v in map_op.groupby_params["by"]:
if isinstance(v, ENTITY_TYPE):
by_chunk = v.cix[
chunk.index[0],
]
by_chunk = v.cix[chunk.index[0],]
chunk_inputs.append(by_chunk)
by.append(by_chunk)
else:
Expand Down Expand Up @@ -554,7 +552,6 @@ def _gen_pivot_chunk(
sample_chunks: List[ChunkType],
agg_chunk_len: int,
):

properties = dict(
by=op.groupby_params["by"],
gpu=op.is_gpu(),
Expand Down
4 changes: 1 addition & 3 deletions mars/dataframe/groupby/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,7 @@ def tile(cls, op):
chunk_by = []
for k in by:
if isinstance(k, SERIES_TYPE):
by_chunk = k.cix[
chunk.index[0],
]
by_chunk = k.cix[chunk.index[0],]
chunk_by.append(by_chunk)
chunk_inputs.append(by_chunk)
else:
Expand Down
4 changes: 1 addition & 3 deletions mars/dataframe/indexing/setitem.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,7 @@ def tile(cls, op: "DataFrameSetitem"):
value_chunks, shape=shape, dtypes=dtypes
)
else:
value_chunk = value.cix[
c.index[0],
]
value_chunk = value.cix[c.index[0],]

chunk_inputs = [c, value_chunk]

Expand Down
4 changes: 1 addition & 3 deletions mars/dataframe/indexing/where.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,7 @@ def get_tiled_chunk(obj, index, axis=None):
return obj.cix[index[0], index[1]]
elif isinstance(obj, SERIES_TYPE):
axis = axis if axis is not None else op.axis
return obj.cix[
index[axis],
]
return obj.cix[index[axis],]
else:
return obj

Expand Down
20 changes: 20 additions & 0 deletions mars/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1566,3 +1566,23 @@ def restore_func(ctx: Context, op):
logger.info("%s func %s is restored.", op, op.func)
else:
op.func = cloudpickle.loads(op.func)


def patch_sa_engine_execute():
"""
pandas did not resolve compatibility issue of sqlalchemy 2.0, the issue
is https://github.com/pandas-dev/pandas/issues/40686. We need to patch
Engine class in SQLAlchemy, and then our code can work well.
"""
try:
from sqlalchemy.engine import Engine
except ImportError: # pragma: no cover
return

def execute(self, statement, *multiparams, **params):
connection = self.connect()
return connection.execute(statement, *multiparams, **params)

if hasattr(Engine, "execute"): # pragma: no cover
return
Engine.execute = execute
4 changes: 3 additions & 1 deletion mars/deploy/kubernetes/tests/test_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ def _remove_docker_image(image_name, raises=True):


def _load_docker_env():
if os.path.exists("/var/run/docker.sock") or not shutil.which("minikube"):
if "USE_MINIKUBE_DOCKER_ENV" not in os.environ and (
os.path.exists("/var/run/docker.sock") or not shutil.which("minikube")
):
return

proc = subprocess.Popen(["minikube", "docker-env"], stdout=subprocess.PIPE)
Expand Down
12 changes: 3 additions & 9 deletions mars/learn/cluster/_k_means_elkan_iter.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,15 +402,9 @@ def tile(cls, op: "KMeansElkanUpdate"):
out_chunks = [list() for _ in range(op.output_limit)]
for i in range(x.chunk_shape[0]):
x_chunk = x.cix[i, 0]
sample_weight_chunk = sample_weight.cix[
i,
]
labels_chunk = labels.cix[
i,
]
upper_bounds_chunk = upper_bounds.cix[
i,
]
sample_weight_chunk = sample_weight.cix[i,]
labels_chunk = labels.cix[i,]
upper_bounds_chunk = upper_bounds.cix[i,]
lower_bounds_chunk = lower_bounds.cix[i, 0]
chunk_op = op.copy().reset_key()
chunk_op.stage = OperandStage.map
Expand Down
12 changes: 3 additions & 9 deletions mars/learn/cluster/_k_means_lloyd_iter.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,9 @@ def tile(cls, op: "KMeansLloydUpdate"):
labels_chunks, centers_new_chunks, weight_in_clusters_chunks = [], [], []
for i in range(x.chunk_shape[0]):
x_chunk = x.cix[i, 0]
sample_weight_chunk = sample_weight.cix[
i,
]
x_squared_norms_chunk = x_squared_norms.cix[
i,
]
labels_chunk = labels.cix[
i,
]
sample_weight_chunk = sample_weight.cix[i,]
x_squared_norms_chunk = x_squared_norms.cix[i,]
labels_chunk = labels.cix[i,]
chunk_op = op.copy().reset_key()
chunk_op.stage = OperandStage.map
chunk_kws = [
Expand Down
Loading

0 comments on commit 37c08a6

Please sign in to comment.