From c6f8c6430015b3d9728536858dc6e3bb8aae227c Mon Sep 17 00:00:00 2001 From: vcapelca Date: Thu, 21 Dec 2017 13:08:19 +0100 Subject: [PATCH] Release 3.0.0/Added 'local' mode for Hersir -Removed default_configuration.py to avoid coupling with our templates definition -Removed swamp & hdfs_deploy_folder params to avoid coupling, they should be used as --extra parameters now -Added -l --local mode to run Hersir from inside EMR (no AWS or ssh calls should be used) -Added -j --job-file-name on Cooper to be able to avoid interactive mode if one knows which .properties file should be called by Oozie -Updated requirements.txt to add new cx_Oracle version 6.1 (Dec 2017) --- requirements.txt | 2 +- setup.py | 2 +- .../cli/interactive/default_configuration.py | 53 ------------------- src/slippinj/cli/scripts/anabasii.py | 35 ++++++++---- src/slippinj/cli/scripts/basic_script.py | 5 -- src/slippinj/cli/scripts/cooper.py | 14 +++-- src/slippinj/cli/scripts/tlacuilo.py | 24 ++++----- src/slippinj/di.py | 2 - src/slippinj/emr/cluster.py | 8 ++- src/slippinj/emr/deploy.py | 42 ++++++++------- tests/slippinj/cli/scripts/test_anabasii.py | 17 +++--- tests/slippinj/cli/scripts/test_cooper.py | 4 +- tests/slippinj/cli/scripts/test_tlacuilo.py | 2 +- tests/slippinj/emr/test_deploy.py | 4 +- 14 files changed, 90 insertions(+), 124 deletions(-) delete mode 100644 src/slippinj/cli/interactive/default_configuration.py diff --git a/requirements.txt b/requirements.txt index 238bb20..28269c1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,4 @@ pymssql==2.1.3 psycopg2==2.7.3.1 pyyaml==3.11 pymysql==0.7.11 -cx_Oracle==6.0b2 +cx_Oracle==6.1 diff --git a/setup.py b/setup.py index 6c54258..80d62df 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ setup( name='slippinj', - version='2.1.0', + version='3.0.0', author='Data Architects SCM Spain', author_email='data.architecture@scmspain.com', packages=find_packages('src'), diff --git a/src/slippinj/cli/interactive/default_configuration.py b/src/slippinj/cli/interactive/default_configuration.py deleted file mode 100644 index 24d4d47..0000000 --- a/src/slippinj/cli/interactive/default_configuration.py +++ /dev/null @@ -1,53 +0,0 @@ -from injector import inject, AssistedBuilder -from slippinj.cli.deploy_configuration import DeployConfiguration - - -class DefaultConfiguration(object): - """Get the default configuration for the workflows""" - - @inject(deploy_configuration=AssistedBuilder(DeployConfiguration), configuration_parser='configuration_parser') - def __init__(self, deploy_configuration, configuration_parser): - """ - Initialize the class - :param deploy_configuration: DeployConfiguration - :param configuration_parser: ConfigParser - """ - super(DefaultConfiguration, self).__init__() - - self.__deploy_configuration = deploy_configuration - self.__configuration_parser = configuration_parser - - def get(self, environment, arguments, workflow_configuration): - """ - Get configuration parameters that are common to the workflows - :param environment: string - :param arguments: Namespace - :param workflow_configuration: dict - :return: dict - """ - default_variables = ['swamp_bucket', 'hdfs_deploy_folder'] - default_configuration = {} - interactive_provided = False - deploy_configuration = self.__deploy_configuration.build(environment=environment, - configuration_parser=self.__configuration_parser) - args = vars(arguments) - - for variable in default_variables: - if variable in args and False != args[variable]: - default_configuration[variable] = args[variable] - interactive_provided = True - elif variable in workflow_configuration: - default_configuration[variable] = workflow_configuration[variable] - elif deploy_configuration.get(variable): - default_configuration[variable] = deploy_configuration.get(variable) - else: - default_configuration[variable] = raw_input( - 'Please, provide the {var_name} value: '.format(var_name=variable.replace('-', ' '))) - interactive_provided = True - - if interactive_provided and 'y' == ( - raw_input('Would you like to save the provided information in the config file: [Y/N] ')).lower(): - for key in default_configuration: - deploy_configuration.set(key, default_configuration[key]) - - return default_configuration diff --git a/src/slippinj/cli/scripts/anabasii.py b/src/slippinj/cli/scripts/anabasii.py index fda6422..5036606 100644 --- a/src/slippinj/cli/scripts/anabasii.py +++ b/src/slippinj/cli/scripts/anabasii.py @@ -1,7 +1,5 @@ import os -import sys - from .basic_script import BasicScript @@ -31,6 +29,13 @@ def get_arguments(self): 'long': '--wf-dir', 'help': 'Folder where all the workflows files are present', 'required': True + }, + { + 'short': '-l', + 'long': '--local-mode', + 'action': 'store_true', + 'help': 'If set, we assume that we\'re on the cluster and no remote connection needs to be made', + 'default': False } ] @@ -42,19 +47,29 @@ def run(self, args, injector): """ logger = injector.get('logger') + cluster_id = '' + + deploy_folder = args.hdfs_deploy_folder + if args.script not in __name__: logger.info('Getting workflow configuration') configuration = self.get_wf_configuration(args, injector) - if 'cluster_id' not in configuration: - configuration.cluster_id = args.cluster_id if False != args.cluster_id else injector.get('interactive_cluster_id').get() - - cluster_id = configuration.cluster_id + if args.local_mode == False: + if 'cluster_id' not in configuration: + configuration.cluster_id = args.cluster_id if False != args.cluster_id else injector.get('interactive_cluster_id').get() + cluster_id = configuration.cluster_id + if 'hdfs_deploy_folder' in configuration: + deploy_folder = configuration.hdfs_deploy_folder wf_compiled_dir = configuration.output_directory if configuration.output_directory else args.wf_dir else: wf_compiled_dir = args.wf_dir - configuration = injector.get('interactive_default_configuration').get('devel', args, {}) - cluster_id = args.cluster_id if False != args.cluster_id else injector.get('interactive_cluster_id').get() + if args.local_mode == False: + cluster_id = args.cluster_id if False != args.cluster_id else injector.get('interactive_cluster_id').get() logger.info('Uploading {wf_dir} to the cluster {cluster_id}'.format(wf_dir=wf_compiled_dir, cluster_id=cluster_id)) - injector.get('emr_deploy').upload_code(wf_compiled_dir, cluster_id, configuration['hdfs_deploy_folder'], - args.wf_dir.strip(os.sep).split(os.sep)[-1]) + + workflow_folder_name = 'workflow' + if args.wf_dir: + workflow_folder_name = args.wf_dir.strip(os.sep).split(os.sep)[-1] + + injector.get('emr_deploy').upload_code(wf_compiled_dir, cluster_id, deploy_folder, workflow_folder_name) diff --git a/src/slippinj/cli/scripts/basic_script.py b/src/slippinj/cli/scripts/basic_script.py index 0f88c02..e2dbd76 100644 --- a/src/slippinj/cli/scripts/basic_script.py +++ b/src/slippinj/cli/scripts/basic_script.py @@ -48,11 +48,6 @@ def get_wf_configuration(self, args, injector): configuration = injector.get('wf_configuration').get_workflow_configuration(configuration_file) - configuration = dict( - injector.get('interactive_default_configuration').get('devel', args, configuration).items() - + configuration.items() - ) - configuration['config_paths'] = configuration_file for key in configuration: diff --git a/src/slippinj/cli/scripts/cooper.py b/src/slippinj/cli/scripts/cooper.py index d7802e9..df06bfd 100644 --- a/src/slippinj/cli/scripts/cooper.py +++ b/src/slippinj/cli/scripts/cooper.py @@ -19,8 +19,13 @@ def get_arguments(self): { 'short': '-w', 'long': '--wf-dir', - 'help': 'Folder where all the workflows files are present', - 'required': True + 'help': 'Folder where all the workflows files are present' + }, + { + 'short': '-j', + 'long': '--job-file-name', + 'help': 'If set, we assume that it is the name of the workflow job configuration file, if not it will ask for the present ones', + 'default': False } ] @@ -32,9 +37,10 @@ def run(self, args, injector): """ configuration = self.get_wf_configuration(args, injector) wf_compiled_dir = configuration.output_directory if configuration.output_directory else args.wf_dir - cluster_id = configuration.cluster_id if configuration.cluster_id else args.cluster_id + cluster_id = configuration.cluster_id if 'cluster_id' in configuration else args.cluster_id + + properties_file = wf_compiled_dir + '/' + args.job_file_name if args.job_file_name != False else injector.get('interactive_properties_file').get(wf_compiled_dir) - properties_file = injector.get('interactive_properties_file').get(wf_compiled_dir) if properties_file: injector.get('logger').info( 'Running {properties_file} in cluster {cluster_id}'.format(properties_file=properties_file, diff --git a/src/slippinj/cli/scripts/tlacuilo.py b/src/slippinj/cli/scripts/tlacuilo.py index ab23d72..c4a7761 100644 --- a/src/slippinj/cli/scripts/tlacuilo.py +++ b/src/slippinj/cli/scripts/tlacuilo.py @@ -16,8 +16,7 @@ def get_arguments(self): { 'short': '-w', 'long': '--wf-dir', - 'help': 'Folder where all the workflows files are present', - 'required': True + 'help': 'Folder where all the workflows files are present' }, { 'short': '-t', @@ -37,28 +36,23 @@ def get_arguments(self): 'action': 'append' }, { - 'short': '-c', + 'short': '-f', 'long': '--configuration-file', 'help': 'File where all the configuration is stored' }, - { - 'short': '-b', - 'long': '--swamp-bucket', - 'help': 'S3 bucket where the data swamp tables are stored', - 'default': False - }, - { - 'short': '-f', - 'long': '--hdfs-deploy-folder', - 'help': 'Folder where all the code will be deployed on HDFS', - 'default': False - }, { 'short': '-i', 'long': '--cluster-information', 'action': 'store_true', 'help': 'Ask interactively for cluster information in order to replace variables in the template', 'default': False + }, + { + 'short': '-l', + 'long': '--local-mode', + 'action': 'store_true', + 'help': 'If set, we assume that we\'re on the cluster and no remote connection needs to be made', + 'default': False } ] diff --git a/src/slippinj/di.py b/src/slippinj/di.py index 03bf894..1420dab 100644 --- a/src/slippinj/di.py +++ b/src/slippinj/di.py @@ -9,7 +9,6 @@ from .cli.interactive.cluster_id import ClusterId from .cli.interactive.configuration_file import ConfigurationFile -from .cli.interactive.default_configuration import DefaultConfiguration from .cli.interactive.properties_file import PropertiesFile from .cli.objects.wf_configuration_object import WfConfigurationObject from .cli.scripts.tables_configuration.tables_configuration import TablesConfiguration @@ -44,7 +43,6 @@ def configure(self, binder): binder.bind('hdfs', to=HDFSFilesystem) binder.bind('interactive_cluster_id', to=ClusterId, scope=singleton) binder.bind('interactive_configuration_file', to=ConfigurationFile, scope=singleton) - binder.bind('interactive_default_configuration', to=DefaultConfiguration) binder.bind('interactive_properties_file', to=PropertiesFile) binder.bind('job_flow', to=RunJobFlow) binder.bind('job_flow_configuration', to=JobFlowConfigurationParser) diff --git a/src/slippinj/emr/cluster.py b/src/slippinj/emr/cluster.py index 3c65b17..5abec24 100644 --- a/src/slippinj/emr/cluster.py +++ b/src/slippinj/emr/cluster.py @@ -2,6 +2,7 @@ from slippinj.cli.ssh import SSHClient +import subprocess class EmrCluster(object): """Handle all EMR cluster information and configuration""" @@ -70,12 +71,15 @@ def exec_command(self, command, cluster_id, stop_on_error=False): :param stop_on_error: boolean :return: string """ - cluster_information = self.get_cluster_information(cluster_id) self.__logger.debug( 'Executing command {command} in cluster {cluster_id}'.format(command=command, cluster_id=cluster_id)) - return self.__ssh_client.exec_command(command, cluster_information['public_dns'], + if cluster_id: + cluster_information = self.get_cluster_information(cluster_id) + return self.__ssh_client.exec_command(command, cluster_information['public_dns'], cluster_information['key_name'], stop_on_error) + else: + return subprocess.check_output(command, shell=True) def open_sftp(self, cluster_id): """ diff --git a/src/slippinj/emr/deploy.py b/src/slippinj/emr/deploy.py index 3f5b78f..6a68291 100644 --- a/src/slippinj/emr/deploy.py +++ b/src/slippinj/emr/deploy.py @@ -2,7 +2,6 @@ import os.path - class EmrDeploy(object): """Execute all the deploy process inside an EMR cluster""" @@ -28,18 +27,18 @@ def run_properties_file(self, properties_file, cluster_id): :param cluster_id: string :return: string """ - remote_properties_path = os.path.join(self._base_remote_dir, os.path.basename(properties_file)) - - self.__emr_cluster.open_sftp(cluster_id).put(properties_file, remote_properties_path) + remote_properties_path = properties_file + if cluster_id: + remote_properties_path = os.path.join(self._base_remote_dir, os.path.basename(properties_file)) + self.__emr_cluster.open_sftp(cluster_id).put(properties_file, remote_properties_path) job_id = self.__emr_cluster.exec_command('oozie job -run -config ' + remote_properties_path, cluster_id, stop_on_error=True) - self.__emr_cluster.exec_command('rm ' + remote_properties_path, cluster_id) return job_id - def upload_code(self, wf_folder, cluster_id, hdfs_deploy_folder, workflow_name=None): + def upload_code(self, wf_folder, cluster_id, hdfs_deploy_folder, workflow_name): """ Upload given workflow code to HDFS connecting to given cluster :param wf_folder: string @@ -48,25 +47,32 @@ def upload_code(self, wf_folder, cluster_id, hdfs_deploy_folder, workflow_name=N :param workflow_name: string :return: boolean """ - basename = wf_folder.strip(os.sep).split(os.sep)[-1] if not workflow_name else workflow_name - tar_file = self.__filesystem.generate_tar_file(wf_folder, basename) + if cluster_id: + tar_file = self.__filesystem.generate_tar_file(wf_folder, workflow_name) + + remote_file = os.path.join(self._base_remote_dir, workflow_name + '.tar.gz') - remote_file = os.path.join(self._base_remote_dir, basename + '.tar.gz') + self.__emr_cluster.exec_command('rm -Rf ' + self._base_remote_dir, cluster_id) - sftp_client = self.__emr_cluster.open_sftp(cluster_id) - self.__emr_cluster.exec_command('rm -Rf ' + self._base_remote_dir, cluster_id) + sftp_client = self.__emr_cluster.open_sftp(cluster_id) - try: - sftp_client.mkdir(self._base_remote_dir) - sftp_client.put(tar_file, remote_file) - except IOError: - return False + try: + sftp_client.mkdir(self._base_remote_dir) + sftp_client.put(tar_file, remote_file) + except IOError: + return False - self.__emr_cluster.exec_command('tar --directory ' + self._base_remote_dir + ' -zxf ' + remote_file, cluster_id) + self.__emr_cluster.exec_command('tar --directory ' + self._base_remote_dir + ' -zxf ' + remote_file, cluster_id) self.__hdfs.rmdir(hdfs_deploy_folder, cluster_id) self.__hdfs.mkdir(hdfs_deploy_folder, cluster_id) - self.__hdfs.put(os.path.join(self._base_remote_dir, basename, '*'), hdfs_deploy_folder, cluster_id) + + source_path = os.path.join(wf_folder, '*') + + if cluster_id: + source_path = os.path.join(self._base_remote_dir, workflow_name, '*') + + self.__hdfs.put(source_path, hdfs_deploy_folder, cluster_id) return True diff --git a/tests/slippinj/cli/scripts/test_anabasii.py b/tests/slippinj/cli/scripts/test_anabasii.py index 662beb9..20c8be0 100644 --- a/tests/slippinj/cli/scripts/test_anabasii.py +++ b/tests/slippinj/cli/scripts/test_anabasii.py @@ -14,7 +14,7 @@ def test_script_can_be_configured(self): Anabasii(mocked_args_parser).configure() - assert 3 == mocked_args_parser.add_argument.call_count + assert 4 == mocked_args_parser.add_argument.call_count def test_script_is_executable_when_cluster_id_has_not_been_provided_not_standalone_run(self): mocked_interactive_cluster_id = Mock() @@ -32,6 +32,7 @@ def test_script_is_executable_when_cluster_id_has_not_been_provided_not_standalo mocked_args.cluster_id = False mocked_args.wf_dir = 'test' mocked_args.hdfs_deploy_folder = 'test' + mocked_args.local_mode = False mocked_args.script = 'hersir' Anabasii(Mock()).run(mocked_args, mocked_injector) @@ -54,6 +55,7 @@ def test_script_is_executable_when_cluster_id_has_not_been_provided_but_added_on mocked_args.cluster_id = False mocked_args.wf_dir = 'test' mocked_args.hdfs_deploy_folder = 'test' + mocked_args.local_mode = False mocked_args.script = 'hersir' Anabasii(Mock()).run(mocked_args, mocked_injector) @@ -67,18 +69,15 @@ def test_script_is_executable_when_cluster_id_has_not_been_provided_standalone_r mocked_emr_deploy = Mock() mocked_emr_deploy.upload_code = Mock(return_value=True) - mocked_interactive_configuration = Mock() - mocked_interactive_configuration.get = Mock(return_value={'hdfs_deploy_folder': 'test'}) - mocked_injector = Mock() mocked_injector.get = Mock( - side_effect=[self.__generate_test_logger(), mocked_interactive_configuration, - mocked_interactive_cluster_id, mocked_emr_deploy]) + side_effect=[self.__generate_test_logger(), mocked_interactive_cluster_id, mocked_emr_deploy]) mocked_args = Mock() mocked_args.cluster_id = False mocked_args.wf_dir = 'test' mocked_args.hdfs_deploy_folder = 'test' + mocked_args.local_mode = False mocked_args.script = 'anabasii' Anabasii(Mock()).run(mocked_args, mocked_injector) @@ -100,6 +99,7 @@ def test_script_is_executable_when_cluster_id_has_been_provided_not_standalone_r mocked_args.cluster_id = 'test' mocked_args.wf_dir = 'test' mocked_args.hdfs_deploy_folder = 'test' + mocked_args.local_mode = False mocked_args.script = 'hersir' Anabasii(Mock()).run(mocked_args, mocked_injector) @@ -114,17 +114,16 @@ def test_script_is_executable_when_cluster_id_has_been_provided_standalone_run(s mocked_emr_deploy = Mock() mocked_emr_deploy.upload_code = Mock(return_value=True) - mocked_interactive_configuration = Mock() - mocked_interactive_configuration.get = Mock(return_value={'hdfs_deploy_folder': 'test'}) mocked_injector = Mock() mocked_injector.get = Mock( - side_effect=[self.__generate_test_logger(), mocked_interactive_configuration, mocked_emr_deploy]) + side_effect=[self.__generate_test_logger(), mocked_emr_deploy]) mocked_args = Mock() mocked_args.cluster_id = 'test' mocked_args.wf_dir = 'test' mocked_args.hdfs_deploy_folder = 'test' + mocked_args.local_mode = False mocked_args.script = 'anabasii' Anabasii(Mock()).run(mocked_args, mocked_injector) diff --git a/tests/slippinj/cli/scripts/test_cooper.py b/tests/slippinj/cli/scripts/test_cooper.py index 03ce577..e5e0774 100644 --- a/tests/slippinj/cli/scripts/test_cooper.py +++ b/tests/slippinj/cli/scripts/test_cooper.py @@ -14,7 +14,7 @@ def test_script_can_be_configured(self): Cooper(mocked_args_parser).configure() - assert 2 == mocked_args_parser.add_argument.call_count + assert 3 == mocked_args_parser.add_argument.call_count def test_script_is_executable_successfully(self): mocked_interactive_properties_file = Mock() @@ -31,6 +31,7 @@ def test_script_is_executable_successfully(self): mocked_args = Mock() mocked_args.wf_dir = 'test' mocked_args.cluster_id = 'test' + mocked_args.job_file_name = False Cooper(Mock()).run(mocked_args, mocked_injector) @@ -51,6 +52,7 @@ def test_script_execution_when_no_properties_file_is_selected(self): mocked_args = Mock() mocked_args.wf_dir = 'test' mocked_args.cluster_id = 'test' + mocked_args.job_file_name = False mocked_emr_deploy.run_properties_file.assert_not_called() diff --git a/tests/slippinj/cli/scripts/test_tlacuilo.py b/tests/slippinj/cli/scripts/test_tlacuilo.py index 38010a5..401c955 100644 --- a/tests/slippinj/cli/scripts/test_tlacuilo.py +++ b/tests/slippinj/cli/scripts/test_tlacuilo.py @@ -15,7 +15,7 @@ def test_script_can_be_configured(self): Tlacuilo(mocked_args_parser).configure() - assert 8 == mocked_args_parser.add_argument.call_count + assert 7 == mocked_args_parser.add_argument.call_count def test_script_raise_an_error_when_path_is_not_absolute(self): mocked_args = Mock() diff --git a/tests/slippinj/emr/test_deploy.py b/tests/slippinj/emr/test_deploy.py index 53a5407..13c4569 100644 --- a/tests/slippinj/emr/test_deploy.py +++ b/tests/slippinj/emr/test_deploy.py @@ -32,7 +32,7 @@ def test_code_is_uploaded_succesfully(self): mocked_hdfs.mkdir = Mock(return_value=True) mocked_hdfs.put = Mock(return_value=True) - assert True == EmrDeploy(mocked_emr_cluster, mocked_hdfs, mocked_filesystem).upload_code('test', 'test', 'test') + assert True == EmrDeploy(mocked_emr_cluster, mocked_hdfs, mocked_filesystem).upload_code('test', 'test', 'test', 'test') def test_code_upload_fails_when_creating_remote_directory(self): mocked_sftp_client = Mock() @@ -54,4 +54,4 @@ def test_code_upload_fails_when_creating_remote_directory(self): mocked_hdfs.put = Mock(return_value=True) assert False == EmrDeploy(mocked_emr_cluster, mocked_hdfs, mocked_filesystem).upload_code('test', 'test', - 'test') + 'test', 'test')