Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add cli command 'mara catalog connect' #3

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ docs/_build/

# Environments
/.venv

/mara_config.py
3 changes: 2 additions & 1 deletion mara_catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ def MARA_CONFIG_MODULES():


def MARA_CLICK_COMMANDS():
return []
from . import cli
return [cli.mara_catalog]
110 changes: 110 additions & 0 deletions mara_catalog/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""Auto-migrate command line interface"""

import sys

import click

from . import config
from .connect import connect_catalog_mara_commands


@click.group()
def mara_catalog():
"""Commands to interact with data lakes and lakehouses"""
pass


@mara_catalog.command()
@click.option('--catalog',
help="The catalog to connect. If not given, all catalogs will be connected.")
@click.option('--db-alias',
help='The database the catalog(s) shall be connected to. If not given, the default db alias is used.')
@click.option('--disable-colors', default=False, is_flag=True,
help='Output logs without coloring them.')
def connect(
catalog: str = None,
db_alias: str = None,

# from mara_pipelines.ui.cli.run_pipeline
disable_colors: bool= False
):
"""
Connects a data lake or lakehouse catalog to a database

Args:
catalog: The catalog to connect to. If not set, all configured catalogs will be connected.
db_alias: The db alias the catalog shall be connected to. If not set, the default db alias is taken.

disable_colors: If true, don't use escape sequences to make the log colorful (default: colorful logging)
"""

from mara_pipelines.pipelines import Pipeline, Task
from mara_pipelines.commands.python import RunFunction
import mara_pipelines.cli
import mara_pipelines.config

# create pipeline
pipeline = Pipeline(
id='_mara_catalog_connect',
description="Connects a catalog with a database")

def create_schema_if_not_exist(db_alias: str, schema_name: str) -> bool:
import sqlalchemy
import sqlalchemy.schema
import mara_db.sqlalchemy_engine

eng = mara_db.sqlalchemy_engine.engine(db_alias)

with eng.connect() as conn:
insp = sqlalchemy.inspect(eng)
if insp.has_schema(schema_name=schema_name, connection=conn):
print(f'Schema {schema_name} already exists')
else:
create_schema = sqlalchemy.schema.CreateSchema(schema_name)
print(create_schema)
conn.execute(create_schema)
conn.commit()

return True

_catalogs = config.catalogs() # make sure to call the function once
for catalog_name in [catalog] or _catalogs:
catalog_pipeline = Pipeline(
id=catalog_name,
description=f"Connect catalog {catalog_name}")

if catalog_name not in _catalogs:
raise ValueError(f"Could not find catalog '{catalog_name}' in the registered catalogs. Please check your configured values for 'mara_catalog.config.catalogs'.")
catalog = _catalogs[catalog_name]

if catalog.tables:
schemas = list(set([table.get('schema', catalog.default_schema) for table in catalog.tables]))

for schema_name in schemas:
# create schema if it does not exist
print(f'found schema: {schema_name}')
catalog_pipeline.add_initial(
Task(id='create_schema',
description=f'Creates the schema {schema_name} if it does not exist',
commands=[
RunFunction(
function=create_schema_if_not_exist,
args=[
mara_pipelines.config.default_db_alias(),
schema_name
])]))

catalog_pipeline.add(
Task(id='create_tables',
description=f'Create tables for schema {catalog.default_schema}',
commands=connect_catalog_mara_commands(catalog=catalog,
db_alias=db_alias or mara_pipelines.config.default_db_alias(),
or_replace=True)))

pipeline.add(catalog_pipeline)

# run connect pipeline
if mara_pipelines.cli.run_pipeline(pipeline, disable_colors=disable_colors):
sys.exit(0)
else:
sys.exit(1)
16 changes: 5 additions & 11 deletions mara_catalog/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,11 @@ def __(db: dbs.SnowflakeDB, table_format: formats.Format) -> Tuple[str, Dict[str
raise NotImplementedError(f'The format {table_format} is not supported for SnowflakeDB')


def connect_catalog_mara_commands(catalog: Union[str, StorageCatalog], db_alias: str,
or_replace: bool = False) -> Iterable[Union[Command, List[Command]]]:
def connect_catalog_mara_commands(
catalog: Union[str, StorageCatalog],
db_alias: str,
or_replace: bool = False
) -> Iterable[Command]:
"""
Returns a list of commands which connects a table list as external storage.

Expand Down Expand Up @@ -210,12 +213,3 @@ def connect_catalog_mara_commands(catalog: Union[str, StorageCatalog], db_alias:
format_name=format_name, or_replace=or_replace, options=format_options)

yield ExecuteSQL(sql_statement, db_alias=db_alias)

#yield Task(
# id=table_to_id(schema_name, table_name),
# description=f"Connect table {schema_name}.{table_name} to db {db_alias}",
# commands=[ExecuteSQL(sql_statement, db_alias=db_alias)])


def table_to_id(schema_name, table_name) -> str:
return f'{schema_name}_{table_name}'.lower()
5 changes: 5 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,8 @@ test =
pytest-dependency
mara_app>=2.3.0
mara-db[postgres,mssql]>=4.9.2
mara-pipelines>=3.5.0

[options.entry_points]
mara.commands =
catalog = mara_catalog.cli:mara_catalog
Loading