From c3a183e4dd7e1a09a4f66b5db6dea8b69b53fbbe Mon Sep 17 00:00:00 2001 From: Maxi Schulz <maximilian.schulz@hs-hannover.de> Date: Thu, 9 Oct 2014 11:24:46 +0200 Subject: [PATCH] [TASK] Introduces dry run mode You can now run hshetl with the --dry parameter to not load anything into the target. Only the queries that would be executed will be shown in the log. Dry run mode per job coming later (atm all jobs dry or none). --- .gitignore | 1 + hshetl/cli.py | 12 +++++++ hshetl/jobs.py | 13 ++++--- hshetl/loaders.py | 90 ++++++++++++++++++++++++++++++++++------------- 4 files changed, 87 insertions(+), 29 deletions(-) diff --git a/.gitignore b/.gitignore index 8ffa891..2122402 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ dist/ *.asta.bak *.asta.lock .metadata/ +distribute-0.7.3-py2.7.egg diff --git a/hshetl/cli.py b/hshetl/cli.py index 863969b..3e5bb02 100644 --- a/hshetl/cli.py +++ b/hshetl/cli.py @@ -104,6 +104,7 @@ def _argparse(config_file = None, working_directory = None): parser.add_argument('-i', '--interactive', action = 'store_true', help = 'Ask before each job execution.') parser.add_argument('-l', '--logformat', dest = 'logformat', help = 'The format of the logging output.', default = '%(levelname)s: %(message)s') parser.add_argument('-p', '--profile', action = 'store_true') + parser.add_argument('--dry', action = 'store_true', help = 'Build all queries and extract the data but do NOT write (load)') return parser @@ -136,6 +137,7 @@ class Controller(object): self.explain = args.explain self.interactive = args.interactive self.profile = args.profile + self.dry = args.dry logging.basicConfig(stream = sys.stdout, level = logging.DEBUG if args.verbose else logging.INFO, format = args.logformat) logging.debug('I am chatty now! Lets shovel data :)') @@ -180,6 +182,8 @@ class Controller(object): '''Build job objects from configuration and execute them.''' if self.interactive: self.make_interactive(self.jobs) + if self.dry: + self.make_dry(self.jobs) try: if self.explain: self.explain_jobs() @@ -219,6 +223,14 @@ class Controller(object): if isinstance(job, JobList): self.make_interactive(job) + def make_dry(self, jobs): + '''Applies dry-run mode to all jobs''' + for job in jobs: + logging.debug('Mark job as dry-run...') + job.dry = True + if isinstance(job, JobList): + self.make_dry(job) + def explain_jobs(self): '''Traverse the job tree and print out jobs name and description in a nice readable format.''' self.render_tree_lvl(self.jobs) diff --git a/hshetl/jobs.py b/hshetl/jobs.py index ae95ba1..f042024 100644 --- a/hshetl/jobs.py +++ b/hshetl/jobs.py @@ -42,7 +42,7 @@ import sys from connectors import SqlQueryLanguageSupportMixin, SqlAlchemyConnector, connector_repository from extractors import AbstractExtractor from transformers import CompareTransformer, AbstractTransformer -from loaders import AbstractLoader +from loaders import AbstractLoader, CsvLoader from entities import entity_repository, Container, Result, COLLISION_HANDLING_BREAKALL from exc import ConfigurationException, JobException @@ -84,7 +84,7 @@ class Job(object): description = u'A Job that does nothing' '''The default description of the job.''' - def __init__(self, name = None, description = None, interactive = False): + def __init__(self, name = None, description = None, interactive = False, dry = False): '''Create a job instance. Parameters @@ -102,6 +102,8 @@ class Job(object): '''The description of the job.''' self.interactive = interactive '''Indicates whether the job will prompt before its execution, or not.''' + self.dry = dry + '''Indicates whether the job will actually execute (False) load queries or not (True)''' self.executed = False '''Indicates if the job was executed''' @@ -657,7 +659,7 @@ class LoadJob(EntityJob, ConnectorJob): '''The default description of this job.''' def __init__(self, loader, source, *args, **kwargs): - '''The constructor for extraction jobs.''' + '''The constructor for load jobs.''' super(LoadJob, self).__init__(*args, **kwargs) if not isinstance(loader, AbstractLoader): raise ConfigurationException('The loader needs to be an instance of hshetl.loaders.AbstractLoader. Encoutered: {}'.format(str(type(loader)))) @@ -672,7 +674,10 @@ class LoadJob(EntityJob, ConnectorJob): '''Load data from the repository into a storage interface.''' result = Result(entity = self.entity, source = self.source) logging.info(unicode(result)) - self.loader.execute(result) + if isinstance(self.loader, CsvLoader): + self.loader.execute(result) + else: + self.loader.execute(result, dry = self.dry) @yamlify diff --git a/hshetl/loaders.py b/hshetl/loaders.py index 204d6b8..11fc0a9 100644 --- a/hshetl/loaders.py +++ b/hshetl/loaders.py @@ -72,7 +72,7 @@ class AbstractLoader(object): ''' return isinstance(connector, AbstractConnector) - def execute(self, result): + def execute(self, result, dry = False): '''Executes the loader. :param hshetl.entities.Result result: @@ -82,10 +82,10 @@ class AbstractLoader(object): if not self.can_execute(self.connector): raise LoaderException('The loader {} can not be executed using this connector {}'.format(self, self.connector)) logging.debug('Loads data: ' + self.__class__.__name__) - self._execute(result) + self._execute(result, dry) logging.debug('Load successfully finished') - def _execute(self, result): + def _execute(self, result, dry = False): '''Executes the loading of data. Distinguishes between update, insert and delete.''' for action in self.operations: getattr(self, '_' + action)(getattr(result, action)) @@ -214,7 +214,7 @@ class SqlAlchemyLoader(AbstractLoader): '''Defines which connector can be handled by this extractor.''' return isinstance(connector, SqlAlchemyConnector) - def _execute(self, result): + def _execute(self, result, dry = False): '''Executes the loading of data. Distinguishes between update, insert and delete''' logging.debug('Will execute the following operations: %s' % self.operations) self.table = Table(self.table_name, @@ -224,7 +224,7 @@ class SqlAlchemyLoader(AbstractLoader): logging.info('Successfully reflected table ' + self.table_name) with self.connector as connection: for action in self.operations: - getattr(self, '_' + action)(connection, getattr(result, action)) + getattr(self, '_' + action)(connection, getattr(result, action), dry) logging.info('%s data sets inserted (%s), %s data sets updated (%s) , %s data sets deleted (%s).' % (str(len(result.insert)), 'Done!' if 'insert' in self.operations else 'Skipped!', str(len(result.update)), @@ -232,32 +232,41 @@ class SqlAlchemyLoader(AbstractLoader): str(len(result.delete)), 'Done!' if 'delete' in self.operations else 'Skipped!')) - def _insert(self, connection, data): + def _insert(self, connection, data, dry): '''Creates a sql insert statement using sqlalchemy and executes it on the connection.''' if len(data) == 0: return values = [] for record in data: values.append(record.to_dict(remove_system_identifiers = False)) statement = self.table.insert() - logging.info('Executing {} with {} '.format(str(statement), str(values))) - connection.execute(statement, values) + if dry: + logging.info('Sql insert query that would be executed: ' + self._generate_human_readable_query(statement, record)) + else: + logging.info('Executing sql insert query: ' + self._generate_human_readable_query(statement, record)) + connection.execute(statement, values) - def _update(self, connection, data): + def _update(self, connection, data, dry): '''Creates a sql update statement using sqlalchemy and executes it on the connection.''' for record in data: statement = self.table.update(values = record.to_dict()) statement = self._add_where_constraints(statement, record) - logging.info('Executing sql update query: ' + str(statement)) - connection.execute(statement) + if dry: + logging.info('Sql update query that would be executed: ' + self._generate_human_readable_query(statement, record)) + else: + logging.info('Executing sql update query: ' + self._generate_human_readable_query(statement, record)) + connection.execute(statement) - def _delete(self, connection, data): + def _delete(self, connection, data, dry): '''Creates a sql delete statement using sqlalchemy and executes it on the connection.''' for record in data: statement = self.table.delete() statement = self._add_where_constraints(statement, record) - logging.info('Executing sql delete query: ' + str(statement)) - connection.execute(statement) + if dry: + logging.info('Sql delete query that would be executed: ' + self._generate_human_readable_query(statement, record.to_dict())) + else: + logging.info('Executing sql delete query: ' + self._generate_human_readable_query(statement, record)) + connection.execute(statement) def _add_where_constraints(self, statement, record): '''Resolves the identifiers for the record and adds the column = value constraints. @@ -276,6 +285,28 @@ class SqlAlchemyLoader(AbstractLoader): raise ConfigurationException('Value for identifier \'' + str(column) + '\' empty.') return statement + def _generate_human_readable_query(self, statement, record): + '''Renders a human readable (normal) sql query from the given statement and record values.''' + query = str(statement) + values = record.to_dict(remove_system_identifiers = False) + prop_types = record.properties + for column in values.keys(): + try: + prop_types[column] + except KeyError: + prop_types[column] = 'unknown because mapped' + if prop_types[column] == 'string': + replace_value = '\'' + unicode(values[column]) + '\'' + elif prop_types[column] == 'binary': + replace_value = '<binary_data>' + else: + replace_value = unicode(values[column]) + if ':' + column + '_1' in query: + query = query.replace(':' + column + '_1', replace_value) + else: + query = query.replace(':' + column, replace_value) + return query + @yamlify class LdapLoader(AbstractLoader): @@ -322,14 +353,14 @@ class LdapLoader(AbstractLoader): '''Defines which connector can be handled by this extractor.''' return isinstance(connector, LdapConnector) - def _execute(self, result): + def _execute(self, result, dry): '''Executes the loading of data. Distinguishes between update, insert and delete.''' logging.debug('Loads data: ' + self.__class__.__name__) if None in (self.rdn, self.base, self.objectClass): raise ConfigurationException('base, rdn or objectClass not found - needs to be configured in the connector or loader!') with self.connector as connection: for action in self.operations: - getattr(self, '_' + action)(connection, getattr(result, action)) + getattr(self, '_' + action)(connection, getattr(result, action), dry) logging.info('%s data sets inserted (%s), %s data sets updated (%s) , %s data sets deleted (%s).' % (str(len(result.insert)), 'Done!' if 'insert' in self.operations else 'Skipped!', str(len(result.update)), @@ -337,7 +368,7 @@ class LdapLoader(AbstractLoader): str(len(result.delete)), 'Done!' if 'delete' in self.operations else 'Skipped!')) - def _update(self, connection, data): + def _update(self, connection, data, dry): '''Updates every dataset one after another.''' for record in data: old_entity_dict = self._prepare_data(record.to_dict(container = record.last_container.target_container)) @@ -346,30 +377,39 @@ class LdapLoader(AbstractLoader): dn = self._get_dn(record.get_container_identifier(), dict_data) attributes = modlist.modifyModlist(old_entity_dict, entity_dict) try: - connection.modify_s(dn, attributes) - logging.info('Modified dn {} with {} '.format(dn, str(attributes))) + if dry: + logging.info('Would modify dn {} with {} '.format(dn, str(attributes))) + else: + connection.modify_s(dn, attributes) + logging.info('Modified dn {} with {} '.format(dn, str(attributes))) except Exception, e: logging.warn('Update failed for dn \'' + dn + '\': ' + str(e)) - def _insert(self, connection, data): + def _insert(self, connection, data, dry): '''Inserts every dataset one after another.''' for record in data: entity_dict = self._prepare_data(record.to_dict()) dn = self._get_dn(record.get_container_identifier(), record.to_dict()) attributes = modlist.addModlist(entity_dict) try: - connection.add_s(dn, attributes) - logging.info('Created dn {} with {} '.format(dn, str(attributes))) + if dry: + logging.info('Would create dn {} with {} '.format(dn, str(attributes))) + else: + connection.add_s(dn, attributes) + logging.info('Created dn {} with {} '.format(dn, str(attributes))) except Exception, e: logging.warn('Insert failed for dn \'' + dn + '\': ' + str(e)) - def _delete(self, connection, data): + def _delete(self, connection, data, dry): '''Deletes every entry one after another.''' for record in data: dn = self._get_dn(record.get_container_identifier(), record.to_dict()) try: - connection.delete_s(dn) - logging.info('Deleted dn {}'.format(dn)) + if dry: + logging.info('Would delete dn {}'.format(dn)) + else: + connection.delete_s(dn) + logging.info('Deleted dn {}'.format(dn)) except Exception, e: logging.warn('Delete failed for dn \'' + dn + '\': ' + str(e)) -- GitLab