Skip to content

Commit

Permalink
Release 3.0.0/Added 'local' mode for Hersir
Browse files Browse the repository at this point in the history
-Removed default_configuration.py to avoid coupling with our templates definition
-Removed swamp & hdfs_deploy_folder params to avoid coupling, they should be used as --extra parameters now
-Added -l --local mode to run Hersir from inside EMR (no AWS or ssh calls should be used)
-Added -j --job-file-name on Cooper to be able to avoid interactive mode if one knows which .properties file should be called by Oozie
-Updated requirements.txt to add new cx_Oracle version 6.1 (Dec 2017)
  • Loading branch information
vcapelca committed Dec 21, 2017
1 parent e396847 commit c6f8c64
Show file tree
Hide file tree
Showing 14 changed files with 90 additions and 124 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ pymssql==2.1.3
psycopg2==2.7.3.1
pyyaml==3.11
pymysql==0.7.11
cx_Oracle==6.0b2
cx_Oracle==6.1
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

setup(
name='slippinj',
version='2.1.0',
version='3.0.0',
author='Data Architects SCM Spain',
author_email='[email protected]',
packages=find_packages('src'),
Expand Down
53 changes: 0 additions & 53 deletions src/slippinj/cli/interactive/default_configuration.py

This file was deleted.

35 changes: 25 additions & 10 deletions src/slippinj/cli/scripts/anabasii.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import os

import sys

from .basic_script import BasicScript


Expand Down Expand Up @@ -31,6 +29,13 @@ def get_arguments(self):
'long': '--wf-dir',
'help': 'Folder where all the workflows files are present',
'required': True
},
{
'short': '-l',
'long': '--local-mode',
'action': 'store_true',
'help': 'If set, we assume that we\'re on the cluster and no remote connection needs to be made',
'default': False
}
]

Expand All @@ -42,19 +47,29 @@ def run(self, args, injector):
"""
logger = injector.get('logger')

cluster_id = ''

deploy_folder = args.hdfs_deploy_folder

if args.script not in __name__:
logger.info('Getting workflow configuration')
configuration = self.get_wf_configuration(args, injector)
if 'cluster_id' not in configuration:
configuration.cluster_id = args.cluster_id if False != args.cluster_id else injector.get('interactive_cluster_id').get()

cluster_id = configuration.cluster_id
if args.local_mode == False:
if 'cluster_id' not in configuration:
configuration.cluster_id = args.cluster_id if False != args.cluster_id else injector.get('interactive_cluster_id').get()
cluster_id = configuration.cluster_id
if 'hdfs_deploy_folder' in configuration:
deploy_folder = configuration.hdfs_deploy_folder
wf_compiled_dir = configuration.output_directory if configuration.output_directory else args.wf_dir
else:
wf_compiled_dir = args.wf_dir
configuration = injector.get('interactive_default_configuration').get('devel', args, {})
cluster_id = args.cluster_id if False != args.cluster_id else injector.get('interactive_cluster_id').get()
if args.local_mode == False:
cluster_id = args.cluster_id if False != args.cluster_id else injector.get('interactive_cluster_id').get()

logger.info('Uploading {wf_dir} to the cluster {cluster_id}'.format(wf_dir=wf_compiled_dir, cluster_id=cluster_id))
injector.get('emr_deploy').upload_code(wf_compiled_dir, cluster_id, configuration['hdfs_deploy_folder'],
args.wf_dir.strip(os.sep).split(os.sep)[-1])

workflow_folder_name = 'workflow'
if args.wf_dir:
workflow_folder_name = args.wf_dir.strip(os.sep).split(os.sep)[-1]

injector.get('emr_deploy').upload_code(wf_compiled_dir, cluster_id, deploy_folder, workflow_folder_name)
5 changes: 0 additions & 5 deletions src/slippinj/cli/scripts/basic_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ def get_wf_configuration(self, args, injector):

configuration = injector.get('wf_configuration').get_workflow_configuration(configuration_file)

configuration = dict(
injector.get('interactive_default_configuration').get('devel', args, configuration).items()
+ configuration.items()
)

configuration['config_paths'] = configuration_file

for key in configuration:
Expand Down
14 changes: 10 additions & 4 deletions src/slippinj/cli/scripts/cooper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@ def get_arguments(self):
{
'short': '-w',
'long': '--wf-dir',
'help': 'Folder where all the workflows files are present',
'required': True
'help': 'Folder where all the workflows files are present'
},
{
'short': '-j',
'long': '--job-file-name',
'help': 'If set, we assume that it is the name of the workflow job configuration file, if not it will ask for the present ones',
'default': False
}
]

Expand All @@ -32,9 +37,10 @@ def run(self, args, injector):
"""
configuration = self.get_wf_configuration(args, injector)
wf_compiled_dir = configuration.output_directory if configuration.output_directory else args.wf_dir
cluster_id = configuration.cluster_id if configuration.cluster_id else args.cluster_id
cluster_id = configuration.cluster_id if 'cluster_id' in configuration else args.cluster_id

properties_file = wf_compiled_dir + '/' + args.job_file_name if args.job_file_name != False else injector.get('interactive_properties_file').get(wf_compiled_dir)

properties_file = injector.get('interactive_properties_file').get(wf_compiled_dir)
if properties_file:
injector.get('logger').info(
'Running {properties_file} in cluster {cluster_id}'.format(properties_file=properties_file,
Expand Down
24 changes: 9 additions & 15 deletions src/slippinj/cli/scripts/tlacuilo.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ def get_arguments(self):
{
'short': '-w',
'long': '--wf-dir',
'help': 'Folder where all the workflows files are present',
'required': True
'help': 'Folder where all the workflows files are present'
},
{
'short': '-t',
Expand All @@ -37,28 +36,23 @@ def get_arguments(self):
'action': 'append'
},
{
'short': '-c',
'short': '-f',
'long': '--configuration-file',
'help': 'File where all the configuration is stored'
},
{
'short': '-b',
'long': '--swamp-bucket',
'help': 'S3 bucket where the data swamp tables are stored',
'default': False
},
{
'short': '-f',
'long': '--hdfs-deploy-folder',
'help': 'Folder where all the code will be deployed on HDFS',
'default': False
},
{
'short': '-i',
'long': '--cluster-information',
'action': 'store_true',
'help': 'Ask interactively for cluster information in order to replace variables in the template',
'default': False
},
{
'short': '-l',
'long': '--local-mode',
'action': 'store_true',
'help': 'If set, we assume that we\'re on the cluster and no remote connection needs to be made',
'default': False
}
]

Expand Down
2 changes: 0 additions & 2 deletions src/slippinj/di.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

from .cli.interactive.cluster_id import ClusterId
from .cli.interactive.configuration_file import ConfigurationFile
from .cli.interactive.default_configuration import DefaultConfiguration
from .cli.interactive.properties_file import PropertiesFile
from .cli.objects.wf_configuration_object import WfConfigurationObject
from .cli.scripts.tables_configuration.tables_configuration import TablesConfiguration
Expand Down Expand Up @@ -44,7 +43,6 @@ def configure(self, binder):
binder.bind('hdfs', to=HDFSFilesystem)
binder.bind('interactive_cluster_id', to=ClusterId, scope=singleton)
binder.bind('interactive_configuration_file', to=ConfigurationFile, scope=singleton)
binder.bind('interactive_default_configuration', to=DefaultConfiguration)
binder.bind('interactive_properties_file', to=PropertiesFile)
binder.bind('job_flow', to=RunJobFlow)
binder.bind('job_flow_configuration', to=JobFlowConfigurationParser)
Expand Down
8 changes: 6 additions & 2 deletions src/slippinj/emr/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from slippinj.cli.ssh import SSHClient

import subprocess

class EmrCluster(object):
"""Handle all EMR cluster information and configuration"""
Expand Down Expand Up @@ -70,12 +71,15 @@ def exec_command(self, command, cluster_id, stop_on_error=False):
:param stop_on_error: boolean
:return: string
"""
cluster_information = self.get_cluster_information(cluster_id)

self.__logger.debug(
'Executing command {command} in cluster {cluster_id}'.format(command=command, cluster_id=cluster_id))
return self.__ssh_client.exec_command(command, cluster_information['public_dns'],
if cluster_id:
cluster_information = self.get_cluster_information(cluster_id)
return self.__ssh_client.exec_command(command, cluster_information['public_dns'],
cluster_information['key_name'], stop_on_error)
else:
return subprocess.check_output(command, shell=True)

def open_sftp(self, cluster_id):
"""
Expand Down
42 changes: 24 additions & 18 deletions src/slippinj/emr/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import os.path


class EmrDeploy(object):
"""Execute all the deploy process inside an EMR cluster"""

Expand All @@ -28,18 +27,18 @@ def run_properties_file(self, properties_file, cluster_id):
:param cluster_id: string
:return: string
"""
remote_properties_path = os.path.join(self._base_remote_dir, os.path.basename(properties_file))

self.__emr_cluster.open_sftp(cluster_id).put(properties_file, remote_properties_path)
remote_properties_path = properties_file
if cluster_id:
remote_properties_path = os.path.join(self._base_remote_dir, os.path.basename(properties_file))
self.__emr_cluster.open_sftp(cluster_id).put(properties_file, remote_properties_path)

job_id = self.__emr_cluster.exec_command('oozie job -run -config ' + remote_properties_path, cluster_id,
stop_on_error=True)

self.__emr_cluster.exec_command('rm ' + remote_properties_path, cluster_id)

return job_id

def upload_code(self, wf_folder, cluster_id, hdfs_deploy_folder, workflow_name=None):
def upload_code(self, wf_folder, cluster_id, hdfs_deploy_folder, workflow_name):
"""
Upload given workflow code to HDFS connecting to given cluster
:param wf_folder: string
Expand All @@ -48,25 +47,32 @@ def upload_code(self, wf_folder, cluster_id, hdfs_deploy_folder, workflow_name=N
:param workflow_name: string
:return: boolean
"""
basename = wf_folder.strip(os.sep).split(os.sep)[-1] if not workflow_name else workflow_name

tar_file = self.__filesystem.generate_tar_file(wf_folder, basename)
if cluster_id:
tar_file = self.__filesystem.generate_tar_file(wf_folder, workflow_name)

remote_file = os.path.join(self._base_remote_dir, workflow_name + '.tar.gz')

remote_file = os.path.join(self._base_remote_dir, basename + '.tar.gz')
self.__emr_cluster.exec_command('rm -Rf ' + self._base_remote_dir, cluster_id)

sftp_client = self.__emr_cluster.open_sftp(cluster_id)
self.__emr_cluster.exec_command('rm -Rf ' + self._base_remote_dir, cluster_id)
sftp_client = self.__emr_cluster.open_sftp(cluster_id)

try:
sftp_client.mkdir(self._base_remote_dir)
sftp_client.put(tar_file, remote_file)
except IOError:
return False
try:
sftp_client.mkdir(self._base_remote_dir)
sftp_client.put(tar_file, remote_file)
except IOError:
return False

self.__emr_cluster.exec_command('tar --directory ' + self._base_remote_dir + ' -zxf ' + remote_file, cluster_id)
self.__emr_cluster.exec_command('tar --directory ' + self._base_remote_dir + ' -zxf ' + remote_file, cluster_id)

self.__hdfs.rmdir(hdfs_deploy_folder, cluster_id)
self.__hdfs.mkdir(hdfs_deploy_folder, cluster_id)
self.__hdfs.put(os.path.join(self._base_remote_dir, basename, '*'), hdfs_deploy_folder, cluster_id)

source_path = os.path.join(wf_folder, '*')

if cluster_id:
source_path = os.path.join(self._base_remote_dir, workflow_name, '*')

self.__hdfs.put(source_path, hdfs_deploy_folder, cluster_id)

return True
Loading

0 comments on commit c6f8c64

Please sign in to comment.