From 1b6ab8950f79ac0eb0012d51dff3572a2eadd49e Mon Sep 17 00:00:00 2001 From: ice1e0 Date: Wed, 21 Oct 2020 22:12:11 +0200 Subject: [PATCH 01/21] use mara_storage for abstraction of local storage interactions --- mara_pipelines/commands/files.py | 43 ++++++++++++++++++++++---- mara_pipelines/config.py | 17 ++++++++++ mara_pipelines/parallel_tasks/files.py | 43 +++++++++++++++++--------- setup.cfg | 2 ++ 4 files changed, 84 insertions(+), 21 deletions(-) diff --git a/mara_pipelines/commands/files.py b/mara_pipelines/commands/files.py index 467e32a..70b9906 100644 --- a/mara_pipelines/commands/files.py +++ b/mara_pipelines/commands/files.py @@ -1,5 +1,6 @@ """Commands for reading files""" +import deprecation import json import pathlib import shlex @@ -9,12 +10,17 @@ import mara_db.dbs import mara_db.shell +import mara_storage.storages +from mara_storage.shell import read_file_command from . import sql from mara_page import _, html from .. import config, pipelines -class Compression(enum.Enum): +@deprecation.deprecated(deprecated_in='3.1.2', removed_in='4.0.0', + current_version=__version__, + details='Use mara_storage.compression.Compression instead') +class Compression(enum.EnumMeta): """Different compression formats that are understood by file readers""" NONE = 'none' GZIP = 'gzip' @@ -22,6 +28,9 @@ class Compression(enum.Enum): ZIP = 'zip' +@deprecation.deprecated(deprecated_in='3.1.2', removed_in='4.0.0', + current_version=__version__, + details='Use mara_storage.compression.uncompressor instead') def uncompressor(compression: Compression) -> str: """Maps compression methods to command line programs that can unpack the respective files""" return {Compression.NONE: 'cat', @@ -35,7 +44,8 @@ class ReadFile(pipelines.Command): def __init__(self, file_name: str, compression: Compression, target_table: str, mapper_script_file_name: str = None, make_unique: bool = False, - db_alias: str = None, csv_format: bool = False, skip_header: bool = False, + db_alias: str = None, storage_alias: str = None, + csv_format: bool = False, skip_header: bool = False, delimiter_char: str = None, quote_char: str = None, null_value_string: str = None, timezone: str = None) -> None: super().__init__() @@ -48,6 +58,7 @@ def __init__(self, file_name: str, compression: Compression, target_table: str, self.csv_format = csv_format self.skip_header = skip_header self._db_alias = db_alias + self._storage_alias = storage_alias self.delimiter_char = delimiter_char self.quote_char = quote_char self.null_value_string = null_value_string @@ -56,6 +67,10 @@ def __init__(self, file_name: str, compression: Compression, target_table: str, def db_alias(self): return self._db_alias or config.default_db_alias() + @property + def storage_alias(self): + return self._storage_alias or config.default_storage_alias() + def shell_command(self): copy_from_stdin_command = mara_db.shell.copy_from_stdin_command( self.db_alias(), csv_format=self.csv_format, target_table=self.target_table, @@ -64,14 +79,17 @@ def shell_command(self): null_value_string=self.null_value_string, timezone=self.timezone) if not isinstance(mara_db.dbs.db(self.db_alias()), mara_db.dbs.BigQueryDB): return \ - f'{uncompressor(self.compression)} "{pathlib.Path(config.data_dir()) / self.file_name}" \\\n' \ + f'{read_file_command(self.storage_alias, file_name=self.file_name, compression=self.compression)} \\\n' \ + (f' | {shlex.quote(sys.executable)} "{self.mapper_file_path()}" \\\n' if self.mapper_script_file_name else '') \ + (' | sort -u \\\n' if self.make_unique else '') \ + ' | ' + copy_from_stdin_command else: # Bigquery loading does not support streaming data through pipes - return copy_from_stdin_command + f" {pathlib.Path(config.data_dir()) / self.file_name}" + storage = mara_storage.storages.storage(self.storage_alias) + if not isinstance(storage, mara_storage.storages.LocalStorage): + raise ValueError('The ReadFile to a BigQuery database can only be used from a storage alias of type LocalStorage') + return copy_from_stdin_command + f' {shlex.quote(str( (storage.base_path / self.file_name).absolute() ))}' def mapper_file_path(self): return self.parent.parent.base_path() / self.mapper_script_file_name @@ -86,6 +104,7 @@ def html_doc_items(self) -> [(str, str)]: ('make unique', _.tt[self.make_unique]), ('target_table', _.tt[self.target_table]), ('db alias', _.tt[self.db_alias()]), + ('storage alias', _.tt[self.storage_alias]), ('csv format', _.tt[self.csv_format]), ('skip header', _.tt[self.skip_header]), ('delimiter char', @@ -100,22 +119,33 @@ def html_doc_items(self) -> [(str, str)]: class ReadSQLite(sql._SQLCommand): def __init__(self, sqlite_file_name: str, target_table: str, sql_statement: str = None, sql_file_name: str = None, replace: {str: str} = None, - db_alias: str = None, timezone: str = None) -> None: + db_alias: str = None, storage_alias: str = None, timezone: str = None) -> None: + if not isinstance(mara_storage.storages.storage(storage_alias), mara_storage.storages.LocalStorage): + raise ValueError('The ReadSQLite task can only be used from a storage alias of type LocalStorage') sql._SQLCommand.__init__(self, sql_statement, sql_file_name, replace) self.sqlite_file_name = sqlite_file_name self.target_table = target_table self._db_alias = db_alias + self._storage_alias = storage_alias self.timezone = timezone @property def db_alias(self): return self._db_alias or config.default_db_alias() + @property + def storage_alias(self): + return self._storage_alias or config.default_storage_alias() + def shell_command(self): + storage = mara_storage.storages.storage(self.storage_alias) + if not isinstance(storage, mara_storage.storages.LocalStorage): + raise ValueError('The ReadSQLite task can only be used from a storage alias of type LocalStorage') + return (sql._SQLCommand.shell_command(self) + ' | ' + mara_db.shell.copy_command( - mara_db.dbs.SQLiteDB(file_name=config.data_dir().absolute() / self.sqlite_file_name), + mara_db.dbs.SQLiteDB(file_name=(storage.base_path / self.sqlite_file_name).absolute()), self.db_alias, self.target_table, timezone=self.timezone)) def html_doc_items(self) -> [(str, str)]: @@ -123,6 +153,7 @@ def html_doc_items(self) -> [(str, str)]: + sql._SQLCommand.html_doc_items(self, None) \ + [('target_table', _.tt[self.target_table]), ('db alias', _.tt[self.db_alias]), + ('storage alias', _.tt[self.storage_alias]), ('time zone', _.tt[self.timezone]), (_.i['shell command'], html.highlight_syntax(self.shell_command(), 'bash'))] diff --git a/mara_pipelines/config.py b/mara_pipelines/config.py index b482eb4..601f5ba 100644 --- a/mara_pipelines/config.py +++ b/mara_pipelines/config.py @@ -1,11 +1,16 @@ """Configuration of data integration pipelines and how to run them""" import datetime +import deprecation import functools import multiprocessing import pathlib import typing +from mara_app.monkey_patch import patch +import mara_storage.config +import mara_storage.storages + from . import pipelines, events @@ -14,6 +19,9 @@ def root_pipeline() -> 'pipelines.Pipeline': return pipelines.demo_pipeline() +@deprecation.deprecated(deprecated_in='3.1.2', removed_in='4.0.0', + current_version=__version__, + details='Use mara_storage.config.storages instead') def data_dir() -> str: """Where to find local data files""" return str(pathlib.Path('data').absolute()) @@ -24,6 +32,15 @@ def default_db_alias() -> str: return 'dwh-etl' +def default_storage_alias() -> str: + """The alias of the storage that should be used when not specified otherwise""" + return 'data' + +@patch(mara_storage.config) +def storages() -> {str: mara_storage.storages.Storage}: + return {'data': mara_storage.storages.LocalStorage(base_path=pathlib.Path(data_dir()))} + + def default_task_max_retries(): """How many times a task is retried when it fails by default """ return 0 diff --git a/mara_pipelines/parallel_tasks/files.py b/mara_pipelines/parallel_tasks/files.py index 5228ee2..dd7d465 100644 --- a/mara_pipelines/parallel_tasks/files.py +++ b/mara_pipelines/parallel_tasks/files.py @@ -12,6 +12,7 @@ import mara_db.dbs import mara_db.postgresql from mara_page import _, html +import mara_storage.client from .. import config, pipelines from ..commands import python, sql, files @@ -34,7 +35,7 @@ def __init__(self, id: str, description: str, file_pattern: str, read_mode: Read max_number_of_parallel_tasks: int = None, file_dependencies: [str] = None, date_regex: str = None, partition_target_table_by_day_id: bool = False, truncate_partitions: bool = False, commands_before: [pipelines.Command] = None, commands_after: [pipelines.Command] = None, - db_alias: str = None, timezone: str = None) -> None: + db_alias: str = None, storage_alias: str = None, timezone: str = None) -> None: pipelines.ParallelTask.__init__(self, id=id, description=description, max_number_of_parallel_tasks=max_number_of_parallel_tasks, commands_before=commands_before, commands_after=commands_after) @@ -52,21 +53,32 @@ def __init__(self, id: str, description: str, file_pattern: str, read_mode: Read self.target_table = target_table self._db_alias = db_alias + self._storage_alias = storage_alias + self.__storage_client = None self.timezone = timezone @property def db_alias(self): return self._db_alias or config.default_db_alias() + @property + def storage_alias(self): + return self._storage_alias or config.default_storage_alias() + + @property + def _storage_client(self): + if not self.__storage_client: + self.__storage_client = mara_storage.client.init_client(self.storage_alias) + + return self.__storage_client + def add_parallel_tasks(self, sub_pipeline: 'pipelines.Pipeline') -> None: import more_itertools files = [] # A list of (file_name, date_or_file_name) tuples - data_dir = config.data_dir() first_date = config.first_date() - for file in glob.iglob(str(pathlib.Path(data_dir, self.file_pattern))): - file = str(pathlib.Path(file).relative_to(pathlib.Path(data_dir))) + for file in self._storage_client.iterate_files(self.file_pattern): if self.date_regex: match = re.match(self.date_regex, file) if not match: @@ -160,16 +172,12 @@ def update_file_dependencies(): def parallel_commands(self, file_name: str) -> [pipelines.Command]: return [self.read_command(file_name)] + ( [python.RunFunction(function=lambda: _processed_files.track_processed_file( - self.path(), file_name, self._last_modification_timestamp(file_name)))] + self.path(), file_name, self._storage_client.path_last_modification_timestamp(file_name)))] if self.read_mode != ReadMode.ALL else []) - def read_command(self) -> pipelines.Command: + def read_command(self, file_name: str) -> pipelines.Command: raise NotImplementedError - def _last_modification_timestamp(self, file_name): - return datetime.datetime.fromtimestamp( - os.path.getmtime(pathlib.Path(config.data_dir()) / file_name)).astimezone() - class ParallelReadFile(_ParallelRead): def __init__(self, id: str, description: str, file_pattern: str, read_mode: ReadMode, @@ -179,14 +187,14 @@ def __init__(self, id: str, description: str, file_pattern: str, read_mode: Read commands_before: [pipelines.Command] = None, commands_after: [pipelines.Command] = None, mapper_script_file_name: str = None, make_unique: bool = False, db_alias: str = None, delimiter_char: str = None, quote_char: str = None, null_value_string: str = None, - skip_header: bool = None, csv_format: bool = False, + skip_header: bool = None, csv_format: bool = False, storage_alias: str = None, timezone: str = None, max_number_of_parallel_tasks: int = None) -> None: _ParallelRead.__init__(self, id=id, description=description, file_pattern=file_pattern, read_mode=read_mode, target_table=target_table, file_dependencies=file_dependencies, date_regex=date_regex, partition_target_table_by_day_id=partition_target_table_by_day_id, truncate_partitions=truncate_partitions, commands_before=commands_before, commands_after=commands_after, - db_alias=db_alias, timezone=timezone, + db_alias=db_alias, storage_alias=storage_alias, timezone=timezone, max_number_of_parallel_tasks=max_number_of_parallel_tasks) self.compression = compression self.mapper_script_file_name = mapper_script_file_name or '' @@ -202,7 +210,7 @@ def read_command(self, file_name: str) -> pipelines.Command: mapper_script_file_name=self.mapper_script_file_name, make_unique=self.make_unique, db_alias=self.db_alias, delimiter_char=self.delimiter_char, skip_header=self.skip_header, quote_char=self.quote_char, null_value_string=self.null_value_string, - csv_format=self.csv_format, timezone=self.timezone) + csv_format=self.csv_format, storage_alias=self.storage_alias, timezone=self.timezone) def html_doc_items(self) -> [(str, str)]: path = self.parent.base_path() / self.mapper_script_file_name if self.mapper_script_file_name else '' @@ -219,6 +227,7 @@ def html_doc_items(self) -> [(str, str)]: ('skip header', _.tt[self.skip_header]), ('target_table', _.tt[self.target_table]), ('db alias', _.tt[self.db_alias]), + ('storage alias', _.tt[self.storage_alias]), ('partion target table by day_id', _.tt[self.partition_target_table_by_day_id]), ('truncate partitions', _.tt[self.truncate_partitions]), ('sql delimiter char', @@ -234,18 +243,21 @@ def __init__(self, id: str, description: str, file_pattern: str, read_mode: Read target_table: str, file_dependencies: [str] = None, date_regex: str = None, partition_target_table_by_day_id: bool = False, truncate_partitions: bool = False, commands_before: [pipelines.Command] = None, commands_after: [pipelines.Command] = None, - db_alias: str = None, timezone=None, max_number_of_parallel_tasks: int = None) -> None: + db_alias: str = None, timezone=None, max_number_of_parallel_tasks: int = None, + storage_alias: str = None) -> None: _ParallelRead.__init__(self, id=id, description=description, file_pattern=file_pattern, read_mode=read_mode, target_table=target_table, file_dependencies=file_dependencies, date_regex=date_regex, partition_target_table_by_day_id=partition_target_table_by_day_id, truncate_partitions=truncate_partitions, commands_before=commands_before, commands_after=commands_after, db_alias=db_alias, + storage_alias=storage_alias, timezone=timezone, max_number_of_parallel_tasks=max_number_of_parallel_tasks) self.sql_file_name = sql_file_name def read_command(self, file_name: str) -> [pipelines.Command]: return files.ReadSQLite(sqlite_file_name=file_name, sql_file_name=self.sql_file_name, - target_table=self.target_table, db_alias=self.db_alias, timezone=self.timezone) + target_table=self.target_table, db_alias=self.db_alias, + storage_alias=self.storage_alias, timezone=self.timezone) def sql_file_path(self): return self.parent.base_path() / self.sql_file_name @@ -262,6 +274,7 @@ def html_doc_items(self) -> [(str, str)]: else '', 'sql')), ('target_table', _.tt[self.target_table]), ('db alias', _.tt[self.db_alias]), + ('storage alias', _.tt[self.storage_alias]), ('partion target table by day_id', _.tt[self.partition_target_table_by_day_id]), ('truncate partitions', _.tt[self.truncate_partitions]), ('time zone', _.tt[self.timezone])] diff --git a/setup.cfg b/setup.cfg index a8a11a9..8f0f529 100644 --- a/setup.cfg +++ b/setup.cfg @@ -14,6 +14,8 @@ python_requires = >= 3.6 install_requires = mara-db>=4.7.1 mara-page>=1.3.0 + mara-storage>=1.0.0 + deprecation>=2.1.0 graphviz>=0.8 python-dateutil>=2.6.1 pythondialog>=3.4.0 From 506c4ece4e107b10983cd076f6f118834c4ac0b8 Mon Sep 17 00:00:00 2001 From: ice1e0 Date: Tue, 27 Oct 2020 20:20:15 +0100 Subject: [PATCH 02/21] serveral build fixes --- mara_pipelines/commands/files.py | 1 + mara_pipelines/config.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/mara_pipelines/commands/files.py b/mara_pipelines/commands/files.py index 70b9906..f7c6387 100644 --- a/mara_pipelines/commands/files.py +++ b/mara_pipelines/commands/files.py @@ -15,6 +15,7 @@ from . import sql from mara_page import _, html from .. import config, pipelines +from . import __version__ @deprecation.deprecated(deprecated_in='3.1.2', removed_in='4.0.0', diff --git a/mara_pipelines/config.py b/mara_pipelines/config.py index 601f5ba..1af4884 100644 --- a/mara_pipelines/config.py +++ b/mara_pipelines/config.py @@ -12,6 +12,7 @@ import mara_storage.storages from . import pipelines, events +from . import __version__ def root_pipeline() -> 'pipelines.Pipeline': @@ -36,7 +37,7 @@ def default_storage_alias() -> str: """The alias of the storage that should be used when not specified otherwise""" return 'data' -@patch(mara_storage.config) +@patch(mara_storage.config.storages) def storages() -> {str: mara_storage.storages.Storage}: return {'data': mara_storage.storages.LocalStorage(base_path=pathlib.Path(data_dir()))} From a530b4c933c5e5d5ccc24028f0c863c1b097dfe9 Mon Sep 17 00:00:00 2001 From: ice1e0 Date: Wed, 28 Oct 2020 19:05:00 +0100 Subject: [PATCH 03/21] improve version handling + set expected release version to 3.2.0 --- mara_pipelines/commands/files.py | 8 ++++---- mara_pipelines/config.py | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/mara_pipelines/commands/files.py b/mara_pipelines/commands/files.py index f7c6387..9758c2f 100644 --- a/mara_pipelines/commands/files.py +++ b/mara_pipelines/commands/files.py @@ -15,11 +15,11 @@ from . import sql from mara_page import _, html from .. import config, pipelines -from . import __version__ +import mara_pipelines -@deprecation.deprecated(deprecated_in='3.1.2', removed_in='4.0.0', - current_version=__version__, +@deprecation.deprecated(deprecated_in='3.2.0', removed_in='4.0.0', + current_version=mara_pipelines.__version__, details='Use mara_storage.compression.Compression instead') class Compression(enum.EnumMeta): """Different compression formats that are understood by file readers""" @@ -29,7 +29,7 @@ class Compression(enum.EnumMeta): ZIP = 'zip' -@deprecation.deprecated(deprecated_in='3.1.2', removed_in='4.0.0', +@deprecation.deprecated(deprecated_in='3.2.0', removed_in='4.0.0', current_version=__version__, details='Use mara_storage.compression.uncompressor instead') def uncompressor(compression: Compression) -> str: diff --git a/mara_pipelines/config.py b/mara_pipelines/config.py index 1af4884..7878ec5 100644 --- a/mara_pipelines/config.py +++ b/mara_pipelines/config.py @@ -12,7 +12,7 @@ import mara_storage.storages from . import pipelines, events -from . import __version__ +import mara_pipelines def root_pipeline() -> 'pipelines.Pipeline': @@ -20,8 +20,8 @@ def root_pipeline() -> 'pipelines.Pipeline': return pipelines.demo_pipeline() -@deprecation.deprecated(deprecated_in='3.1.2', removed_in='4.0.0', - current_version=__version__, +@deprecation.deprecated(deprecated_in='3.2.0', removed_in='4.0.0', + current_version=mara_pipelines.__version__, details='Use mara_storage.config.storages instead') def data_dir() -> str: """Where to find local data files""" From 7278b0a308849f48f12f0e77444166e5bfd5cd2b Mon Sep 17 00:00:00 2001 From: Leo Schick Date: Sun, 19 Jun 2022 14:06:42 +0200 Subject: [PATCH 04/21] upgrade to mara-storage 1.0.0 --- mara_pipelines/parallel_tasks/files.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mara_pipelines/parallel_tasks/files.py b/mara_pipelines/parallel_tasks/files.py index dd7d465..c1e4978 100644 --- a/mara_pipelines/parallel_tasks/files.py +++ b/mara_pipelines/parallel_tasks/files.py @@ -68,7 +68,7 @@ def storage_alias(self): @property def _storage_client(self): if not self.__storage_client: - self.__storage_client = mara_storage.client.init_client(self.storage_alias) + self.__storage_client = mara_storage.client.StorageClient(self.storage_alias) return self.__storage_client From ff187b8e9fd58ba785d365b253c55cd741273c56 Mon Sep 17 00:00:00 2001 From: Leo Schick Date: Sun, 19 Jun 2022 14:06:49 +0200 Subject: [PATCH 05/21] fix _ParallelRead since merge of mara_storage #55 --- mara_pipelines/parallel_tasks/files.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mara_pipelines/parallel_tasks/files.py b/mara_pipelines/parallel_tasks/files.py index c1e4978..af90145 100644 --- a/mara_pipelines/parallel_tasks/files.py +++ b/mara_pipelines/parallel_tasks/files.py @@ -172,7 +172,7 @@ def update_file_dependencies(): def parallel_commands(self, file_name: str) -> [pipelines.Command]: return [self.read_command(file_name)] + ( [python.RunFunction(function=lambda: _processed_files.track_processed_file( - self.path(), file_name, self._storage_client.path_last_modification_timestamp(file_name)))] + self.path(), file_name, self._storage_client.last_modification_timestamp(file_name)))] if self.read_mode != ReadMode.ALL else []) def read_command(self, file_name: str) -> pipelines.Command: From a0de9d4ca2574fa3018c5ce0735aee5230789f6c Mon Sep 17 00:00:00 2001 From: ice1e0 Date: Wed, 28 Oct 2020 19:05:00 +0100 Subject: [PATCH 06/21] improve version handling + set expected release version to 3.2.0 --- mara_pipelines/commands/files.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mara_pipelines/commands/files.py b/mara_pipelines/commands/files.py index 9758c2f..7814e02 100644 --- a/mara_pipelines/commands/files.py +++ b/mara_pipelines/commands/files.py @@ -30,7 +30,7 @@ class Compression(enum.EnumMeta): @deprecation.deprecated(deprecated_in='3.2.0', removed_in='4.0.0', - current_version=__version__, + current_version=mara_pipelines.__version__, details='Use mara_storage.compression.uncompressor instead') def uncompressor(compression: Compression) -> str: """Maps compression methods to command line programs that can unpack the respective files""" From f3f177ab18f440edb469441a947ecf3848d83c87 Mon Sep 17 00:00:00 2001 From: Leo Schick Date: Mon, 25 Apr 2022 14:47:54 +0200 Subject: [PATCH 07/21] add execution context --- mara_pipelines/commands/python.py | 5 ++- mara_pipelines/commands/sql.py | 5 ++- mara_pipelines/config.py | 12 ++++++ mara_pipelines/contexts/__init__.py | 55 +++++++++++++++++++++++++ mara_pipelines/contexts/bash.py | 8 ++++ mara_pipelines/contexts/ssh.py | 23 +++++++++++ mara_pipelines/execution.py | 47 ++++++++++++++++++--- mara_pipelines/pipelines.py | 64 ++++++++++++++++++----------- mara_pipelines/shell.py | 5 ++- 9 files changed, 190 insertions(+), 34 deletions(-) create mode 100644 mara_pipelines/contexts/__init__.py create mode 100644 mara_pipelines/contexts/bash.py create mode 100644 mara_pipelines/contexts/ssh.py diff --git a/mara_pipelines/commands/python.py b/mara_pipelines/commands/python.py index fc36e49..4cf86c8 100644 --- a/mara_pipelines/commands/python.py +++ b/mara_pipelines/commands/python.py @@ -8,6 +8,7 @@ from typing import Union, Callable, List from ..incremental_processing import file_dependencies from ..logging import logger +from ..contexts import ExecutionContext from mara_page import html, _ from .. import pipelines @@ -78,7 +79,7 @@ def file_name(self): def args(self): return self._args() if callable(self._args) else self._args - def run(self) -> bool: + def run(self, context: ExecutionContext = None) -> bool: dependency_type = 'ExecutePython ' + self.file_name if self.file_dependencies: assert (self.parent) @@ -89,7 +90,7 @@ def run(self) -> bool: logger.log('no changes') return True - if not super().run(): + if not super().run(context=context): return False if self.file_dependencies: diff --git a/mara_pipelines/commands/sql.py b/mara_pipelines/commands/sql.py index db04f08..bd512b1 100644 --- a/mara_pipelines/commands/sql.py +++ b/mara_pipelines/commands/sql.py @@ -14,6 +14,7 @@ from ..incremental_processing import file_dependencies from ..incremental_processing import incremental_copy_status from ..logging import logger +from ..contexts import ExecutionContext class _SQLCommand(pipelines.Command): @@ -102,7 +103,7 @@ def __init__(self, sql_statement: str = None, sql_file_name: Union[str, Callable def db_alias(self): return self._db_alias or config.default_db_alias() - def run(self) -> bool: + def run(self, context: ExecutionContext = None) -> bool: if self.sql_file_name: logger.log(self.sql_file_name, logger.Format.ITALICS) @@ -124,7 +125,7 @@ def run(self) -> bool: # probably not be there (usually the first step is a DROP). file_dependencies.delete(self.node_path(), dependency_type) - if not super().run(): + if not super().run(context=context): return False if self.file_dependencies: diff --git a/mara_pipelines/config.py b/mara_pipelines/config.py index 7878ec5..00022c5 100644 --- a/mara_pipelines/config.py +++ b/mara_pipelines/config.py @@ -12,6 +12,8 @@ import mara_storage.storages from . import pipelines, events +from .contexts import ExecutionContext +from .contexts.bash import BashExecutionContext import mara_pipelines @@ -67,6 +69,16 @@ def bash_command_string() -> str: return '/usr/bin/env bash -o pipefail' +def default_execution_context() -> str: + """Sets the default execution context""" + return 'bash' + + +def execution_contexts() -> {str: ExecutionContext}: + """The available execution contexts""" + return {'bash': BashExecutionContext()} + + def system_statistics_collection_period() -> typing.Union[float, None]: """ How often should system statistics be collected in seconds. diff --git a/mara_pipelines/contexts/__init__.py b/mara_pipelines/contexts/__init__.py new file mode 100644 index 0000000..db860d1 --- /dev/null +++ b/mara_pipelines/contexts/__init__.py @@ -0,0 +1,55 @@ +from .. import shell + + +class ExecutionContext: + """The execution context for a shell command""" + def __init__(self) -> None: + self._active = False + + @property + def is_active(self) -> bool: + return self._active + + def __enter__(self): # -> ExecutionContext: + """ + Enters the execution context. + + This place can be used to spin up cloud resource. + """ + self._active = True + return self + + def __exit__(self, type, value, traceback) -> bool: + """Exits the execution context freeing up used resources.""" + self._active = False + return True + + def _test_active(self): + """Thest if the current context is active""" + if not self.is_active: + raise Exception('The current context is not activated. Call mycontext.__enter__ before using this method') + + def run_shell_command(self, shell_command: str) -> bool: + """Executes a shell command in the context""" + raise NotImplementedError() + + +class _LocalShellExecutionContext(ExecutionContext): + """Runs the shell commands in a context through the local shell""" + def __init__(self): + self.bash_command_string: str = None + + def run_shell_command(self, shell_command: str) -> bool: + self._test_active() + + # logger.log(f'{self.bash_command_string} -c {shlex.quote(shell_command)}', format=logger.Format.ITALICS) + return shell.run_shell_command(shell_command, bash_command_string=self.bash_command_string) + + +def context(alias: str) -> ExecutionContext: + """Returns a execution config by alias""" + from .. import config + execution_contexts = config.execution_contexts() + if alias not in execution_contexts: + raise KeyError(f'execution context alias "{alias}" not configured') + return execution_contexts[alias] diff --git a/mara_pipelines/contexts/bash.py b/mara_pipelines/contexts/bash.py new file mode 100644 index 0000000..df6dd7a --- /dev/null +++ b/mara_pipelines/contexts/bash.py @@ -0,0 +1,8 @@ +from . import _LocalShellExecutionContext +from .. import config + + +class BashExecutionContext(_LocalShellExecutionContext): + """Runs the shell commands in the local bash shell""" + def __init__(self, bash_command_string: str = None): + self.bash_command_string = bash_command_string or config.bash_command_string() or '/usr/bin/env bash -o pipefail' diff --git a/mara_pipelines/contexts/ssh.py b/mara_pipelines/contexts/ssh.py new file mode 100644 index 0000000..959bf26 --- /dev/null +++ b/mara_pipelines/contexts/ssh.py @@ -0,0 +1,23 @@ +import shlex + +from . import _LocalShellExecutionContext + + +class SshExecutionContext(_LocalShellExecutionContext): + """Runs the shell commands on a remote host via ssh""" + def __init__(self, host: str, port: int = None, user: str = None, identity_file: str = None, configfile: str = None): + """ + Args: + host: the remote ssh post + port: the ssh port. By default 22 + user: the remote user + identity_file: the identity file for the user login + configfile: a custom config file for ssh + """ + self.bash_command_string = ('/usr/bin/env ssh ' + + (f'-c {configfile} ' if configfile else '') + + (f'-i {identity_file} ' if identity_file else '') + + (f'{user}@' if user else '') + + host + + (f':{port}' if port else '') + + f' {shlex.quote("/usr/bin/env bash -o pipefail ")}') diff --git a/mara_pipelines/execution.py b/mara_pipelines/execution.py index 57b6294..2439561 100644 --- a/mara_pipelines/execution.py +++ b/mara_pipelines/execution.py @@ -17,7 +17,7 @@ from multiprocessing.context import BaseContext from queue import Empty -from . import pipelines, config +from . import pipelines, config, contexts from .logging import logger, pipeline_events, system_statistics, run_log, node_cost from . import events @@ -67,6 +67,18 @@ def run(): statistics_process: multiprocessing.Process = None + def exit_contexts(active_contexts: {str: contexts.ExecutionContext}, exception: Exception = None): + for context_alias, context in active_contexts.items(): + try: + print(f"exit execution context '{context_alias}'") + if exception: + context.__exit__(type(exception), exception, exception.__traceback__) + else: + context.__exit__(None, None, None) + except e: + print(f"failed to exit execution context '{context_alias}'. Exception: {e}") + pass + try: # capture output of print statements and other unplanned output logger.redirect_output(event_queue, pipeline.path()) @@ -107,6 +119,8 @@ def with_all_upstreams(nodes: {pipelines.Node}): # queue whole pipeline queue([pipeline]) + # execution contexts + active_contexts: {str: contexts.ExecutionContext} = {} # book keeping run_start_time = datetime.datetime.now(tz.utc) # all nodes that already ran or that won't be run anymore @@ -262,6 +276,20 @@ def track_finished_pipelines(): logger.redirect_output(event_queue, pipeline.path()) else: + # initialize context + next_node_context = next_node.context() or config.default_execution_context() + if next_node_context not in active_contexts: + # enter context + new_context = contexts.context(next_node_context) + + # TODO add better logging here + print(f"enter execution context '{next_node_context}'") + + if not new_context.__enter__() or not new_context.is_active: + raise Exception(f'Could not enter execution context {next_node_context}') + + active_contexts[next_node_context] = new_context + # run a task in a subprocess if next_node.parent in running_pipelines: running_pipelines[next_node.parent][1] += 1 @@ -272,7 +300,8 @@ def track_finished_pipelines(): message='★ ' + node_cost.format_duration( node_durations_and_run_times.get(tuple(next_node.path()), [0, 0])[0]))) - process = TaskProcess(next_node, event_queue, multiprocessing_context) + status_queue = multiprocessing_context.Queue() + process = TaskProcess(next_node, event_queue, status_queue, active_contexts[next_node_context]) process.start() running_task_processes[next_node] = process @@ -306,10 +335,17 @@ def track_finished_pipelines(): # don't busy-wait time.sleep(0.001) - except: + except e: + # exit active contexts + exit_contexts(active_contexts, exception=e) + event_queue.put(pipeline_events.Output(node_path=pipeline.path(), message=traceback.format_exc(), format=logger.Format.ITALICS, is_error=True)) + finally: + # exit active contexts + exit_contexts(active_contexts) + # run again because `dequeue` might have moved more nodes to `finished_nodes` track_finished_pipelines() @@ -424,7 +460,7 @@ def initialize_run_logger() -> events.EventHandler: class TaskProcess: - def __init__(self, task: pipelines.Task, event_queue: multiprocessing.Queue, multiprocessing_context: BaseContext): + def __init__(self, task: pipelines.Task, event_queue: multiprocessing.Queue, multiprocessing_context: BaseContext, context: contexts.ExecutionContext): """ Runs a task in a separate sub process. @@ -440,6 +476,7 @@ def __init__(self, task: pipelines.Task, event_queue: multiprocessing.Queue, mul self.task = task self.event_queue = event_queue self._status_queue = multiprocessing_context.Queue() + self.context = context self.start_time = datetime.datetime.now(tz.utc) self._succeeded: bool = None @@ -451,7 +488,7 @@ def run(self): attempt = 0 try: while True: - if not self.task.run(): + if not self.task.run(context=self.context): max_retries = self.task.max_retries or config.default_task_max_retries() if attempt < max_retries: attempt += 1 diff --git a/mara_pipelines/pipelines.py b/mara_pipelines/pipelines.py index aec1272..0c26f8c 100644 --- a/mara_pipelines/pipelines.py +++ b/mara_pipelines/pipelines.py @@ -4,17 +4,19 @@ import typing from . import config +from .contexts import ExecutionContext class Node(): """Base class for pipeline elements""" - def __init__(self, id: str, description: str, labels: {str: str} = None) -> None: + def __init__(self, id: str, description: str, labels: {str: str} = None, context: str = None) -> None: if not re.match('^[a-z0-9_]+$', id): raise ValueError(f'Invalid id "{id}". Should only contain lowercase letters, numbers and "_".') self.id: str = id self.description: str = description self.labels: {str: str} = labels or {} + self._context: str = context self.upstreams: {'Node'} = set() self.downstreams: {'Node'} = set() @@ -38,6 +40,12 @@ def url_path(self) -> str: """Returns a uri fragment for referring to nodes""" return '/'.join(self.path()) or None + def context(self) -> str: + if self._context: + return self._context + if self.parent: + return self.parent.context() + def __repr__(self): return f'<{self.__class__.__name__} "{self.id}">' @@ -51,16 +59,23 @@ class Command(): """ parent: Node = None - def run(self) -> bool: + def run(self, context: ExecutionContext = None) -> bool: """ Runs the command + Args: + context: The execution context for the shell command. If not set the local shell is used + Returns: False on failure """ - from . import shell shell_command = self.shell_command() + if context: + return context.run_shell_command(shell_command) + + from . import shell + # logger.log(f'{config.bash_command_string()} -c {shlex.quote(shell_command)}', format=logger.Format.ITALICS) return shell.run_shell_command(shell_command) @@ -81,8 +96,8 @@ def html_doc_items(self) -> [(str, str)]: class Task(Node): - def __init__(self, id: str, description: str, commands: [Command] = None, max_retries: int = None) -> None: - super().__init__(id, description) + def __init__(self, id: str, description: str, commands: [Command] = None, max_retries: int = None, context: str = None) -> None: + super().__init__(id, description, context=context) self.commands = [] self.max_retries = max_retries @@ -100,17 +115,18 @@ def add_commands(self, commands: [Command]): for command in commands: self.add_command(command) - def run(self): + def run(self, context: ExecutionContext = None): for command in self.commands: - if not command.run(): + if not command.run(context=context): return False return True class ParallelTask(Node): def __init__(self, id: str, description: str, max_number_of_parallel_tasks: int = None, - commands_before: [Command] = None, commands_after: [Command] = None) -> None: - super().__init__(id, description) + commands_before: [Command] = None, commands_after: [Command] = None, + context: str = None) -> None: + super().__init__(id, description, context=context) self.commands_before = [] for command in commands_before or []: self.add_command_before(command) @@ -149,18 +165,6 @@ def html_doc_items(self) -> [(str, str)]: class Pipeline(Node): - """ - A directed acyclic graph (DAG) of nodes with dependencies between them. - - Args: - id: The id of the pipeline - description: A short summary of what the pipeline is doing - max_number_of_parallel_tasks: Only that many nodes of the pipeline will run in parallel - base_path: The absolute path of the pipeline root, file names are relative to that - labels: An arbitrary dictionary application specific tags, schemas and so on. - ignore_errors: When true, then the pipeline execution will not fail when a child node fails - force_run_all_children: When true, child nodes will run even when their upstreams failed - """ nodes: {str: Node} = None initial_node: Node = None final_node: Node = None @@ -171,8 +175,22 @@ def __init__(self, id: str, base_path: pathlib.Path = None, labels: {str: str} = None, ignore_errors: bool = False, - force_run_all_children: bool = False) -> None: - super().__init__(id, description, labels) + force_run_all_children: bool = False, + context: str = None) -> None: + """ + A directed acyclic graph (DAG) of nodes with dependencies between them. + + Args: + id: The id of the pipeline + description: A short summary of what the pipeline is doing + max_number_of_parallel_tasks: Only that many nodes of the pipeline will run in parallel + base_path: The absolute path of the pipeline root, file names are relative to that + labels: An arbitrary dictionary application specific tags, schemas and so on. + ignore_errors: When true, then the pipeline execution will not fail when a child node fails + force_run_all_children: When true, child nodes will run even when their upstreams failed + context: The execution context for the pipeline + """ + super().__init__(id, description, labels, context=context) self.nodes = {} self._base_path = base_path self.max_number_of_parallel_tasks = max_number_of_parallel_tasks diff --git a/mara_pipelines/shell.py b/mara_pipelines/shell.py index eeb4ce3..c7beaf9 100644 --- a/mara_pipelines/shell.py +++ b/mara_pipelines/shell.py @@ -6,13 +6,14 @@ from .logging import logger -def run_shell_command(command: str, log_command: bool = True): +def run_shell_command(command: str, log_command: bool = True, bash_command_string: str = None): """ Runs a command in a bash shell and logs the output of the command in (near)real-time. Args: command: The command to run log_command: When true, then the command itself is logged before execution + bash_command_string: The command used for running a bash, should somehow include the `pipefail` option Returns: Either (in order) @@ -25,7 +26,7 @@ def run_shell_command(command: str, log_command: bool = True): if log_command: logger.log(command, format=logger.Format.ITALICS) - process = subprocess.Popen(shlex.split(config.bash_command_string()) + ['-c', command], + process = subprocess.Popen(shlex.split(bash_command_string or config.bash_command_string()) + ['-c', command], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) From 367d9f083bf2697a8455ed2a6ce02fea65bf7022 Mon Sep 17 00:00:00 2001 From: Leo Schick Date: Tue, 26 Apr 2022 08:24:56 +0200 Subject: [PATCH 08/21] ssh add password option and fix pipefail option execution --- mara_pipelines/contexts/ssh.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/mara_pipelines/contexts/ssh.py b/mara_pipelines/contexts/ssh.py index 959bf26..26acad1 100644 --- a/mara_pipelines/contexts/ssh.py +++ b/mara_pipelines/contexts/ssh.py @@ -5,19 +5,22 @@ class SshExecutionContext(_LocalShellExecutionContext): """Runs the shell commands on a remote host via ssh""" - def __init__(self, host: str, port: int = None, user: str = None, identity_file: str = None, configfile: str = None): + def __init__(self, host: str, port: int = None, user: str = None, password: str = None, identity_file: str = None, configfile: str = None): """ Args: host: the remote ssh post port: the ssh port. By default 22 user: the remote user + password: the remote user password. identity_file: the identity file for the user login configfile: a custom config file for ssh """ - self.bash_command_string = ('/usr/bin/env ssh ' + self.bash_command_string = ('/usr/bin/env ' + + (f'sshpass -p {shlex.quote(password)} ' if password and not identity_file else '') + + 'ssh ' + (f'-c {configfile} ' if configfile else '') + (f'-i {identity_file} ' if identity_file else '') + (f'{user}@' if user else '') + host + (f':{port}' if port else '') - + f' {shlex.quote("/usr/bin/env bash -o pipefail ")}') + + f' {shlex.quote("set -o pipefail ; ")}') From 161edac860745422a4c70fa1cc4b258e1723f27f Mon Sep 17 00:00:00 2001 From: Leo Schick Date: Tue, 26 Apr 2022 11:10:17 +0200 Subject: [PATCH 09/21] fix ssh execution context + move to contexts.bash module --- mara_pipelines/contexts/bash.py | 29 +++++++++++++++++++++++++++++ mara_pipelines/contexts/ssh.py | 26 -------------------------- 2 files changed, 29 insertions(+), 26 deletions(-) delete mode 100644 mara_pipelines/contexts/ssh.py diff --git a/mara_pipelines/contexts/bash.py b/mara_pipelines/contexts/bash.py index df6dd7a..34c1183 100644 --- a/mara_pipelines/contexts/bash.py +++ b/mara_pipelines/contexts/bash.py @@ -1,3 +1,6 @@ +import shlex +import typing as t + from . import _LocalShellExecutionContext from .. import config @@ -6,3 +9,29 @@ class BashExecutionContext(_LocalShellExecutionContext): """Runs the shell commands in the local bash shell""" def __init__(self, bash_command_string: str = None): self.bash_command_string = bash_command_string or config.bash_command_string() or '/usr/bin/env bash -o pipefail' + + +class SshBashExecutionContext(_LocalShellExecutionContext): + """Runs the shell commands in a bash shell on a remote host using ssh""" + def __init__(self, host: str, port: int = None, user: str = None, password: str = None, identity_file: str = None, + configfile: str = None, options: t.List[str] = None): + """ + Args: + host: the remote ssh post + port: the ssh port. By default 22 + user: the remote user + password: the remote user password. + identity_file: the identity file for the user login + configfile: a custom config file for ssh + options: a list of SSH options to be used + """ + self.bash_command_string = ('/usr/bin/env ' + + (f'sshpass -p {shlex.quote(password)} ' if password and not identity_file else '') + + 'ssh ' + + (f'-F {str(configfile)} ' if configfile else '') + + (f'-i {identity_file} ' if identity_file else '') + + (' '.join(f"-o '{o}'" for o in options)+' ' if options else '') + + (f'{user}@' if user else '') + + host + + (f':{port}' if port else '') + + f' bash -o pipefail') diff --git a/mara_pipelines/contexts/ssh.py b/mara_pipelines/contexts/ssh.py deleted file mode 100644 index 26acad1..0000000 --- a/mara_pipelines/contexts/ssh.py +++ /dev/null @@ -1,26 +0,0 @@ -import shlex - -from . import _LocalShellExecutionContext - - -class SshExecutionContext(_LocalShellExecutionContext): - """Runs the shell commands on a remote host via ssh""" - def __init__(self, host: str, port: int = None, user: str = None, password: str = None, identity_file: str = None, configfile: str = None): - """ - Args: - host: the remote ssh post - port: the ssh port. By default 22 - user: the remote user - password: the remote user password. - identity_file: the identity file for the user login - configfile: a custom config file for ssh - """ - self.bash_command_string = ('/usr/bin/env ' - + (f'sshpass -p {shlex.quote(password)} ' if password and not identity_file else '') - + 'ssh ' - + (f'-c {configfile} ' if configfile else '') - + (f'-i {identity_file} ' if identity_file else '') - + (f'{user}@' if user else '') - + host - + (f':{port}' if port else '') - + f' {shlex.quote("set -o pipefail ; ")}') From c53cc529800a6545b1e4a854a8ef757ce3f13a46 Mon Sep 17 00:00:00 2001 From: Leo Schick Date: Tue, 26 Apr 2022 21:57:00 +0200 Subject: [PATCH 10/21] add docker execution context --- mara_pipelines/contexts/docker.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 mara_pipelines/contexts/docker.py diff --git a/mara_pipelines/contexts/docker.py b/mara_pipelines/contexts/docker.py new file mode 100644 index 0000000..36e8229 --- /dev/null +++ b/mara_pipelines/contexts/docker.py @@ -0,0 +1,15 @@ +from . import _LocalShellExecutionContext + + +class DockerExecutionContext(_LocalShellExecutionContext): + """Runs the shell commands in a bash shell on a remote host using ssh""" + def __init__(self, container: str, context: str = None): + """ + Args: + container: the docker container name + context: the docker context + """ + self.bash_command_string = ('/usr/bin/env docker ' + + (f'--context={context} ' if context else '') + + f'exec -i {container}' + + f' bash -o pipefail -c') From 21daa2ec3ea54c510974226630c6f31fa5fe72db Mon Sep 17 00:00:00 2001 From: Leo Schick Date: Wed, 27 Apr 2022 13:42:49 +0200 Subject: [PATCH 11/21] fix Copy command execution context --- mara_pipelines/commands/sql.py | 39 ++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/mara_pipelines/commands/sql.py b/mara_pipelines/commands/sql.py index bd512b1..84c5b3a 100644 --- a/mara_pipelines/commands/sql.py +++ b/mara_pipelines/commands/sql.py @@ -14,7 +14,7 @@ from ..incremental_processing import file_dependencies from ..incremental_processing import incremental_copy_status from ..logging import logger -from ..contexts import ExecutionContext +from ..contexts import ExecutionContext, _LocalShellExecutionContext class _SQLCommand(pipelines.Command): @@ -168,7 +168,7 @@ def target_db_alias(self): def file_path(self) -> pathlib.Path: return self.parent.parent.base_path() / self.sql_file_name - def run(self) -> bool: + def run(self, context: ExecutionContext = None) -> bool: if self.sql_file_name: logger.log(self.sql_file_name, logger.Format.ITALICS) @@ -188,7 +188,7 @@ def run(self) -> bool: # (see also above in ExecuteSQL) file_dependencies.delete(self.node_path(), dependency_type) - if not super().run(): + if not super().run(context=context): return False if self.file_dependencies: @@ -264,15 +264,22 @@ def __init__(self, source_db_alias: str, source_table: str, def target_db_alias(self): return self._target_db_alias or config.default_db_alias() - def run(self) -> bool: + def run(self, context: ExecutionContext = None) -> bool: + if isinstance(context, _LocalShellExecutionContext): + run_shell_command = context.run_shell_command + elif context is None: + run_shell_command = shell.run_shell_command + else: + raise ValueError('The context must inherit type _LocalShellExecutionContext') + # retrieve the highest current value for the modification comparison (e.g.: the highest timestamp) # We intentionally use the command line here (rather than sqlalchemy) to avoid forcing people python drivers, # which can be hard for example in the case of SQL Server logger.log(f'Get new max modification comparison value...', format=logger.Format.ITALICS) max_value_query = f'SELECT max({self.modification_comparison}) AS maxval FROM {self.source_table}' logger.log(max_value_query, format=logger.Format.VERBATIM) - result = shell.run_shell_command(f'echo {shlex.quote(max_value_query)} \\\n | ' - + mara_db.shell.copy_to_stdout_command(self.source_db_alias)) + result = run_shell_command(f'echo {shlex.quote(max_value_query)} \\\n | ' + + mara_db.shell.copy_to_stdout_command(self.source_db_alias)) if not result: return False @@ -324,7 +331,7 @@ def run(self) -> bool: # overwrite the comparison criteria to get everything replace = {self.comparison_value_placeholder: '(1=1)'} complete_copy_command = self._copy_command(self.target_table, replace) - if not shell.run_shell_command(complete_copy_command): + if not run_shell_command(complete_copy_command): return False else: @@ -333,8 +340,8 @@ def run(self) -> bool: create_upsert_table_query = (f'DROP TABLE IF EXISTS {self.target_table}_upsert;\n' + f'CREATE TABLE {self.target_table}_upsert AS SELECT * from {self.target_table} WHERE FALSE') - if not shell.run_shell_command(f'echo {shlex.quote(create_upsert_table_query)} \\\n | ' - + mara_db.shell.query_command(self.target_db_alias)): + if not run_shell_command(f'echo {shlex.quote(create_upsert_table_query)} \\\n | ' + + mara_db.shell.query_command(self.target_db_alias)): return False # perform the actual copy replacing the placeholder @@ -342,7 +349,7 @@ def run(self) -> bool: modification_comparison_type = self.modification_comparison_type or '' replace = {self.comparison_value_placeholder: f'({self.modification_comparison} >= {modification_comparison_type} \'{last_comparison_value}\')'} - if not shell.run_shell_command(self._copy_command(self.target_table + '_upsert', replace)): + if not run_shell_command(self._copy_command(self.target_table + '_upsert', replace)): return False # now the upsert table has to be merged with the target one @@ -371,11 +378,11 @@ def run(self) -> bool: SELECT src.* FROM {self.target_table}_upsert src WHERE NOT EXISTS (SELECT 1 FROM {self.target_table} dst WHERE {key_definition})""" - if not shell.run_shell_command(f'echo {shlex.quote(update_query)} \\\n | ' - + mara_db.shell.query_command(self.target_db_alias)): + if not run_shell_command(f'echo {shlex.quote(update_query)} \\\n | ' + + mara_db.shell.query_command(self.target_db_alias)): return False - elif not shell.run_shell_command(f'echo {shlex.quote(insert_query)} \\\n | ' - + mara_db.shell.query_command(self.target_db_alias)): + elif not run_shell_command(f'echo {shlex.quote(insert_query)} \\\n | ' + + mara_db.shell.query_command(self.target_db_alias)): return False else: upsery_query = f""" @@ -384,8 +391,8 @@ def run(self) -> bool: FROM {self.target_table}_upsert ON CONFLICT ({key_definition}) DO UPDATE SET {set_clause}""" - if not shell.run_shell_command(f'echo {shlex.quote(upsery_query)} \\\n | ' - + mara_db.shell.query_command(self.target_db_alias)): + if not run_shell_command(f'echo {shlex.quote(upsery_query)} \\\n | ' + + mara_db.shell.query_command(self.target_db_alias)): return False # update data_integration_incremental_copy_status From 39b1d561969857428324b5962e54eb7ebfdd5735 Mon Sep 17 00:00:00 2001 From: Leo Schick Date: Wed, 4 May 2022 21:24:46 +0200 Subject: [PATCH 12/21] use task.run(context=context) where implemented + use optimistic pattern in CopyIncrementally this will work around issues where task.run does not implement the context pattern e.g. RunFunction. These commands will always run locally --- mara_pipelines/commands/sql.py | 9 ++------- mara_pipelines/execution.py | 8 +++++++- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/mara_pipelines/commands/sql.py b/mara_pipelines/commands/sql.py index 84c5b3a..90608d9 100644 --- a/mara_pipelines/commands/sql.py +++ b/mara_pipelines/commands/sql.py @@ -14,7 +14,7 @@ from ..incremental_processing import file_dependencies from ..incremental_processing import incremental_copy_status from ..logging import logger -from ..contexts import ExecutionContext, _LocalShellExecutionContext +from ..contexts import ExecutionContext class _SQLCommand(pipelines.Command): @@ -265,12 +265,7 @@ def target_db_alias(self): return self._target_db_alias or config.default_db_alias() def run(self, context: ExecutionContext = None) -> bool: - if isinstance(context, _LocalShellExecutionContext): - run_shell_command = context.run_shell_command - elif context is None: - run_shell_command = shell.run_shell_command - else: - raise ValueError('The context must inherit type _LocalShellExecutionContext') + run_shell_command = context.run_shell_command if context else shell.run_shell_command # retrieve the highest current value for the modification comparison (e.g.: the highest timestamp) # We intentionally use the command line here (rather than sqlalchemy) to avoid forcing people python drivers, diff --git a/mara_pipelines/execution.py b/mara_pipelines/execution.py index 2439561..504b378 100644 --- a/mara_pipelines/execution.py +++ b/mara_pipelines/execution.py @@ -479,6 +479,12 @@ def __init__(self, task: pipelines.Task, event_queue: multiprocessing.Queue, mul self.context = context self.start_time = datetime.datetime.now(tz.utc) self._succeeded: bool = None + self.run_kargs = {} + + # add dynamic kargs for self.task.run(...) + from inspect import signature + if 'context' in signature(task.run).parameters: + self.run_kargs['context'] = context def run(self): # redirect stdout and stderr to queue @@ -488,7 +494,7 @@ def run(self): attempt = 0 try: while True: - if not self.task.run(context=self.context): + if not self.task.run(**self.run_kargs): max_retries = self.task.max_retries or config.default_task_max_retries() if attempt < max_retries: attempt += 1 From 21fb8929e70511798e307d7d0f084d22a79a08df Mon Sep 17 00:00:00 2001 From: Leo Schick Date: Thu, 5 May 2022 15:06:16 +0200 Subject: [PATCH 13/21] fix passing args/kargs to task/command.run --- mara_pipelines/commands/python.py | 6 +++--- mara_pipelines/commands/sql.py | 12 ++++++------ mara_pipelines/pipelines.py | 10 +++++----- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/mara_pipelines/commands/python.py b/mara_pipelines/commands/python.py index 4cf86c8..ee77cb6 100644 --- a/mara_pipelines/commands/python.py +++ b/mara_pipelines/commands/python.py @@ -30,7 +30,7 @@ def __init__(self, function: Callable = None, args: [str] = None, file_dependenc self.args = args or [] self.file_dependencies = file_dependencies or [] - def run(self) -> bool: + def run(self, *args, **kargs) -> bool: dependency_type = 'RunFunction ' + self.function.__name__ if self.file_dependencies: assert (self.parent) @@ -79,7 +79,7 @@ def file_name(self): def args(self): return self._args() if callable(self._args) else self._args - def run(self, context: ExecutionContext = None) -> bool: + def run(self, *args, **kargs) -> bool: dependency_type = 'ExecutePython ' + self.file_name if self.file_dependencies: assert (self.parent) @@ -90,7 +90,7 @@ def run(self, context: ExecutionContext = None) -> bool: logger.log('no changes') return True - if not super().run(context=context): + if not super().run(*args, **kargs): return False if self.file_dependencies: diff --git a/mara_pipelines/commands/sql.py b/mara_pipelines/commands/sql.py index 90608d9..d101e9b 100644 --- a/mara_pipelines/commands/sql.py +++ b/mara_pipelines/commands/sql.py @@ -103,7 +103,7 @@ def __init__(self, sql_statement: str = None, sql_file_name: Union[str, Callable def db_alias(self): return self._db_alias or config.default_db_alias() - def run(self, context: ExecutionContext = None) -> bool: + def run(self, *args, **kargs) -> bool: if self.sql_file_name: logger.log(self.sql_file_name, logger.Format.ITALICS) @@ -125,7 +125,7 @@ def run(self, context: ExecutionContext = None) -> bool: # probably not be there (usually the first step is a DROP). file_dependencies.delete(self.node_path(), dependency_type) - if not super().run(context=context): + if not super().run(*args, **kargs): return False if self.file_dependencies: @@ -168,7 +168,7 @@ def target_db_alias(self): def file_path(self) -> pathlib.Path: return self.parent.parent.base_path() / self.sql_file_name - def run(self, context: ExecutionContext = None) -> bool: + def run(self, *args, **kargs) -> bool: if self.sql_file_name: logger.log(self.sql_file_name, logger.Format.ITALICS) @@ -188,7 +188,7 @@ def run(self, context: ExecutionContext = None) -> bool: # (see also above in ExecuteSQL) file_dependencies.delete(self.node_path(), dependency_type) - if not super().run(context=context): + if not super().run(*args, **kargs): return False if self.file_dependencies: @@ -264,8 +264,8 @@ def __init__(self, source_db_alias: str, source_table: str, def target_db_alias(self): return self._target_db_alias or config.default_db_alias() - def run(self, context: ExecutionContext = None) -> bool: - run_shell_command = context.run_shell_command if context else shell.run_shell_command + def run(self, *args, **kargs) -> bool: + run_shell_command = kargs['context'].run_shell_command if 'context' in kargs else shell.run_shell_command # retrieve the highest current value for the modification comparison (e.g.: the highest timestamp) # We intentionally use the command line here (rather than sqlalchemy) to avoid forcing people python drivers, diff --git a/mara_pipelines/pipelines.py b/mara_pipelines/pipelines.py index 0c26f8c..c32072b 100644 --- a/mara_pipelines/pipelines.py +++ b/mara_pipelines/pipelines.py @@ -59,7 +59,7 @@ class Command(): """ parent: Node = None - def run(self, context: ExecutionContext = None) -> bool: + def run(self, *args, **kargs) -> bool: """ Runs the command @@ -71,8 +71,8 @@ def run(self, context: ExecutionContext = None) -> bool: """ shell_command = self.shell_command() - if context: - return context.run_shell_command(shell_command) + if 'context' in kargs: + return kargs['context'].run_shell_command(shell_command) from . import shell @@ -115,9 +115,9 @@ def add_commands(self, commands: [Command]): for command in commands: self.add_command(command) - def run(self, context: ExecutionContext = None): + def run(self, *args, **kargs): for command in self.commands: - if not command.run(context=context): + if not command.run(*args, **kargs): return False return True From 1e1609a648fc0c77964b4e2365868210c70f1237 Mon Sep 17 00:00:00 2001 From: Leo Schick Date: Thu, 5 May 2022 16:12:01 +0200 Subject: [PATCH 14/21] clean code --- mara_pipelines/commands/python.py | 1 - mara_pipelines/commands/sql.py | 1 - mara_pipelines/execution.py | 2 +- mara_pipelines/pipelines.py | 2 -- 4 files changed, 1 insertion(+), 5 deletions(-) diff --git a/mara_pipelines/commands/python.py b/mara_pipelines/commands/python.py index ee77cb6..97475d6 100644 --- a/mara_pipelines/commands/python.py +++ b/mara_pipelines/commands/python.py @@ -8,7 +8,6 @@ from typing import Union, Callable, List from ..incremental_processing import file_dependencies from ..logging import logger -from ..contexts import ExecutionContext from mara_page import html, _ from .. import pipelines diff --git a/mara_pipelines/commands/sql.py b/mara_pipelines/commands/sql.py index d101e9b..66dfa16 100644 --- a/mara_pipelines/commands/sql.py +++ b/mara_pipelines/commands/sql.py @@ -14,7 +14,6 @@ from ..incremental_processing import file_dependencies from ..incremental_processing import incremental_copy_status from ..logging import logger -from ..contexts import ExecutionContext class _SQLCommand(pipelines.Command): diff --git a/mara_pipelines/execution.py b/mara_pipelines/execution.py index 504b378..f8d6259 100644 --- a/mara_pipelines/execution.py +++ b/mara_pipelines/execution.py @@ -16,6 +16,7 @@ from multiprocessing import queues from multiprocessing.context import BaseContext from queue import Empty +from inspect import signature from . import pipelines, config, contexts from .logging import logger, pipeline_events, system_statistics, run_log, node_cost @@ -482,7 +483,6 @@ def __init__(self, task: pipelines.Task, event_queue: multiprocessing.Queue, mul self.run_kargs = {} # add dynamic kargs for self.task.run(...) - from inspect import signature if 'context' in signature(task.run).parameters: self.run_kargs['context'] = context diff --git a/mara_pipelines/pipelines.py b/mara_pipelines/pipelines.py index c32072b..e9e936d 100644 --- a/mara_pipelines/pipelines.py +++ b/mara_pipelines/pipelines.py @@ -4,7 +4,6 @@ import typing from . import config -from .contexts import ExecutionContext class Node(): @@ -75,7 +74,6 @@ def run(self, *args, **kargs) -> bool: return kargs['context'].run_shell_command(shell_command) from . import shell - # logger.log(f'{config.bash_command_string()} -c {shlex.quote(shell_command)}', format=logger.Format.ITALICS) return shell.run_shell_command(shell_command) From a939b8d9d5b91314adfeff2bee350a52d7dfd938 Mon Sep 17 00:00:00 2001 From: Leo Schick Date: Thu, 5 May 2022 16:20:32 +0200 Subject: [PATCH 15/21] use cache for contexts.context(alias) function --- mara_pipelines/contexts/__init__.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/mara_pipelines/contexts/__init__.py b/mara_pipelines/contexts/__init__.py index db860d1..4bef758 100644 --- a/mara_pipelines/contexts/__init__.py +++ b/mara_pipelines/contexts/__init__.py @@ -1,3 +1,5 @@ +import functools + from .. import shell @@ -46,6 +48,7 @@ def run_shell_command(self, shell_command: str) -> bool: return shell.run_shell_command(shell_command, bash_command_string=self.bash_command_string) +@functools.lru_cache(maxsize=None) def context(alias: str) -> ExecutionContext: """Returns a execution config by alias""" from .. import config From a7272c4d514e2d34e70767f385ccd7b67dd59986 Mon Sep 17 00:00:00 2001 From: Leo Schick Date: Tue, 17 May 2022 14:53:43 +0200 Subject: [PATCH 16/21] fix run --- mara_pipelines/execution.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mara_pipelines/execution.py b/mara_pipelines/execution.py index f8d6259..e71ed33 100644 --- a/mara_pipelines/execution.py +++ b/mara_pipelines/execution.py @@ -483,7 +483,8 @@ def __init__(self, task: pipelines.Task, event_queue: multiprocessing.Queue, mul self.run_kargs = {} # add dynamic kargs for self.task.run(...) - if 'context' in signature(task.run).parameters: + task_run_signature = signature(task.run) + if 'context' in task_run_signature.parameters or 'kargs' in task_run_signature.parameters: self.run_kargs['context'] = context def run(self): From ce2e06a483adb547724e0b189462068ccf0cb979 Mon Sep 17 00:00:00 2001 From: Leo Schick Date: Fri, 20 May 2022 10:30:39 +0200 Subject: [PATCH 17/21] fix support legacy commands run --- mara_pipelines/pipelines.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/mara_pipelines/pipelines.py b/mara_pipelines/pipelines.py index e9e936d..54cd321 100644 --- a/mara_pipelines/pipelines.py +++ b/mara_pipelines/pipelines.py @@ -113,10 +113,16 @@ def add_commands(self, commands: [Command]): for command in commands: self.add_command(command) - def run(self, *args, **kargs): + def run(self, *args, **kargs) -> bool: + from inspect import signature for command in self.commands: - if not command.run(*args, **kargs): - return False + if signature(command.run).parameters: + if not command.run(*args, **kargs): + return False + else: + # call run for legacy commands which do not implement parameter *args and **kargs + if not command.run(): + return False return True From fbc6f959eb1d85af7d1da8ddd4aa3801e43a23d0 Mon Sep 17 00:00:00 2001 From: Leo Schick Date: Mon, 13 Jun 2022 17:37:09 +0200 Subject: [PATCH 18/21] support executing scripts through docker context V2 adding suffix Bash to docker execution context class names --- mara_pipelines/contexts/docker.py | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/mara_pipelines/contexts/docker.py b/mara_pipelines/contexts/docker.py index 36e8229..adc0627 100644 --- a/mara_pipelines/contexts/docker.py +++ b/mara_pipelines/contexts/docker.py @@ -1,15 +1,33 @@ from . import _LocalShellExecutionContext -class DockerExecutionContext(_LocalShellExecutionContext): - """Runs the shell commands in a bash shell on a remote host using ssh""" +class DockerBashExecutionContext(_LocalShellExecutionContext): + """Runs the shell commands in a bash shell on a docker container""" def __init__(self, container: str, context: str = None): """ Args: container: the docker container name context: the docker context """ - self.bash_command_string = ('/usr/bin/env docker ' - + (f'--context={context} ' if context else '') - + f'exec -i {container}' + self.bash_command_string = ('/usr/bin/env docker' + + (f' --context={context}' if context else '') + + f' exec -i {container}' + f' bash -o pipefail -c') + + +class DockerComposeBashExecutionContext(_LocalShellExecutionContext): + """Runs the shell commands in a bash shell on a docker container isinde a docker compose project""" + def __init__(self, container: str, context: str = None, compose_project_name: str = None): + """ + Args: + container: the docker container name + context: the docker context + compose_project_name: the docker compose project name + """ + self.bash_command_string = ('/usr/bin/env ' + + (f'COMPOSE_PROJECT_NAME={compose_project_name} ' if compose_project_name else '') + + 'docker' + + (f' --context={context}' if context else '') + + ' compose' + + f' exec -i {container}' + + f' bash -o pipefail -c') From 99e11d863e925761a785df4273fedcffa427221e Mon Sep 17 00:00:00 2001 From: Leo Schick Date: Wed, 15 Jun 2022 09:39:40 +0200 Subject: [PATCH 19/21] fix duplicated arg. -c --- mara_pipelines/contexts/docker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mara_pipelines/contexts/docker.py b/mara_pipelines/contexts/docker.py index adc0627..c496e4b 100644 --- a/mara_pipelines/contexts/docker.py +++ b/mara_pipelines/contexts/docker.py @@ -12,7 +12,7 @@ def __init__(self, container: str, context: str = None): self.bash_command_string = ('/usr/bin/env docker' + (f' --context={context}' if context else '') + f' exec -i {container}' - + f' bash -o pipefail -c') + + f' bash -o pipefail') class DockerComposeBashExecutionContext(_LocalShellExecutionContext): @@ -30,4 +30,4 @@ def __init__(self, container: str, context: str = None, compose_project_name: st + (f' --context={context}' if context else '') + ' compose' + f' exec -i {container}' - + f' bash -o pipefail -c') + + f' bash -o pipefail') From b926b32d37a6f06cdaaa54462512f7922199518e Mon Sep 17 00:00:00 2001 From: Leo Schick Date: Mon, 11 Jul 2022 16:47:05 +0200 Subject: [PATCH 20/21] fix pipeline hang when context is not configured --- mara_pipelines/execution.py | 59 ++++++++++++++++++++++--------------- 1 file changed, 36 insertions(+), 23 deletions(-) diff --git a/mara_pipelines/execution.py b/mara_pipelines/execution.py index e71ed33..f53c306 100644 --- a/mara_pipelines/execution.py +++ b/mara_pipelines/execution.py @@ -67,6 +67,8 @@ def run_pipeline(pipeline: pipelines.Pipeline, nodes: {pipelines.Node} = None, def run(): statistics_process: multiprocessing.Process = None + # execution contexts + active_contexts: {str: contexts.ExecutionContext} = {} def exit_contexts(active_contexts: {str: contexts.ExecutionContext}, exception: Exception = None): for context_alias, context in active_contexts.items(): @@ -120,8 +122,6 @@ def with_all_upstreams(nodes: {pipelines.Node}): # queue whole pipeline queue([pipeline]) - # execution contexts - active_contexts: {str: contexts.ExecutionContext} = {} # book keeping run_start_time = datetime.datetime.now(tz.utc) # all nodes that already ran or that won't be run anymore @@ -277,34 +277,47 @@ def track_finished_pipelines(): logger.redirect_output(event_queue, pipeline.path()) else: - # initialize context - next_node_context = next_node.context() or config.default_execution_context() - if next_node_context not in active_contexts: - # enter context - new_context = contexts.context(next_node_context) - - # TODO add better logging here - print(f"enter execution context '{next_node_context}'") - - if not new_context.__enter__() or not new_context.is_active: - raise Exception(f'Could not enter execution context {next_node_context}') - - active_contexts[next_node_context] = new_context - # run a task in a subprocess + task_start_time = datetime.datetime.now(tz.utc) if next_node.parent in running_pipelines: running_pipelines[next_node.parent][1] += 1 event_queue.put( - pipeline_events.NodeStarted(next_node.path(), datetime.datetime.now(tz.utc), False)) + pipeline_events.NodeStarted(next_node.path(), task_start_time, False)) event_queue.put(pipeline_events.Output( node_path=next_node.path(), format=logger.Format.ITALICS, message='★ ' + node_cost.format_duration( node_durations_and_run_times.get(tuple(next_node.path()), [0, 0])[0]))) - status_queue = multiprocessing_context.Queue() - process = TaskProcess(next_node, event_queue, status_queue, active_contexts[next_node_context]) - process.start() - running_task_processes[next_node] = process + # initialize context + next_node_context = next_node.context() or config.default_execution_context() + if next_node_context not in active_contexts: + # enter context + try: + logger.log(message=f"enter execution context '{next_node_context}'", format=logger.Format.STANDARD) + + new_context = contexts.context(next_node_context) + + if not new_context.__enter__() or not new_context.is_active: + raise Exception(f'Could not enter execution context {next_node_context}') + + active_contexts[next_node_context] = new_context + except Exception as e: + logger.log(message=f"Could not initiate execution context", format=logger.Format.ITALICS, + is_error=True) + logger.log(message=traceback.format_exc(), + format=pipeline_events.Output.Format.VERBATIM, is_error=True) + event_queue.put(pipeline_events.NodeFinished( + node_path=next_node.path(), start_time=task_start_time, + end_time=datetime.datetime.now(tz.utc), is_pipeline=False, succeeded=False)) + + failed_pipelines.add(next_node.parent) + processed_nodes.add(next_node) + + if next_node_context in active_contexts: + status_queue = multiprocessing_context.Queue() + process = TaskProcess(next_node, event_queue, status_queue, active_contexts[next_node_context]) + process.start() + running_task_processes[next_node] = process # check whether some of the running processes finished for task_process in list(running_task_processes.values()): # type: TaskProcess @@ -347,8 +360,8 @@ def track_finished_pipelines(): # exit active contexts exit_contexts(active_contexts) - # run again because `dequeue` might have moved more nodes to `finished_nodes` - track_finished_pipelines() + # run again because `dequeue` might have moved more nodes to `finished_nodes` + track_finished_pipelines() if statistics_process: # kill the stats process (joining or terminating does not work in gunicorn) From cfd89fdeb81286bc249c350cc817c68934a12569 Mon Sep 17 00:00:00 2001 From: Leo Schick Date: Fri, 24 Feb 2023 09:17:24 +0100 Subject: [PATCH 21/21] use always logger, not print --- mara_pipelines/execution.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/mara_pipelines/execution.py b/mara_pipelines/execution.py index f53c306..8f6feb4 100644 --- a/mara_pipelines/execution.py +++ b/mara_pipelines/execution.py @@ -73,14 +73,15 @@ def run(): def exit_contexts(active_contexts: {str: contexts.ExecutionContext}, exception: Exception = None): for context_alias, context in active_contexts.items(): try: - print(f"exit execution context '{context_alias}'") + logger.log(f"exit execution context '{context_alias}'", + format=logger.Format.ITALICS) if exception: context.__exit__(type(exception), exception, exception.__traceback__) else: context.__exit__(None, None, None) except e: - print(f"failed to exit execution context '{context_alias}'. Exception: {e}") - pass + logger.log(f"failed to exit execution context '{context_alias}'.\nException: {e}", + format=logger.Format.ITALICS, is_error=True) try: # capture output of print statements and other unplanned output @@ -263,7 +264,7 @@ def track_finished_pipelines(): except Exception as e: event_queue.put(pipeline_events.NodeStarted( node_path=next_node.path(), start_time=task_start_time, is_pipeline=True)) - logger.log(message=f'Could not launch parallel tasks', format=logger.Format.ITALICS, + logger.log(message='Could not launch parallel tasks', format=logger.Format.ITALICS, is_error=True) logger.log(message=traceback.format_exc(), format=pipeline_events.Output.Format.VERBATIM, is_error=True) @@ -302,7 +303,7 @@ def track_finished_pipelines(): active_contexts[next_node_context] = new_context except Exception as e: - logger.log(message=f"Could not initiate execution context", format=logger.Format.ITALICS, + logger.log(message="Could not initiate execution context", format=logger.Format.ITALICS, is_error=True) logger.log(message=traceback.format_exc(), format=pipeline_events.Output.Format.VERBATIM, is_error=True)