diff --git a/.gitignore b/.gitignore index 8ffa89179289635c197da6f3c2d803df4fb5c7e1..2122402ae4cd49e9f93ec954960307c2f1156294 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ dist/ *.asta.bak *.asta.lock .metadata/ +distribute-0.7.3-py2.7.egg diff --git a/hshetl/cli.py b/hshetl/cli.py index 863969bb8379e82a00137166c583746d16723773..3e5bb02b97847db2bcc34d762c514caea81ec614 100644 --- a/hshetl/cli.py +++ b/hshetl/cli.py @@ -104,6 +104,7 @@ def _argparse(config_file = None, working_directory = None): parser.add_argument('-i', '--interactive', action = 'store_true', help = 'Ask before each job execution.') parser.add_argument('-l', '--logformat', dest = 'logformat', help = 'The format of the logging output.', default = '%(levelname)s: %(message)s') parser.add_argument('-p', '--profile', action = 'store_true') + parser.add_argument('--dry', action = 'store_true', help = 'Build all queries and extract the data but do NOT write (load)') return parser @@ -136,6 +137,7 @@ class Controller(object): self.explain = args.explain self.interactive = args.interactive self.profile = args.profile + self.dry = args.dry logging.basicConfig(stream = sys.stdout, level = logging.DEBUG if args.verbose else logging.INFO, format = args.logformat) logging.debug('I am chatty now! Lets shovel data :)') @@ -180,6 +182,8 @@ class Controller(object): '''Build job objects from configuration and execute them.''' if self.interactive: self.make_interactive(self.jobs) + if self.dry: + self.make_dry(self.jobs) try: if self.explain: self.explain_jobs() @@ -219,6 +223,14 @@ class Controller(object): if isinstance(job, JobList): self.make_interactive(job) + def make_dry(self, jobs): + '''Applies dry-run mode to all jobs''' + for job in jobs: + logging.debug('Mark job as dry-run...') + job.dry = True + if isinstance(job, JobList): + self.make_dry(job) + def explain_jobs(self): '''Traverse the job tree and print out jobs name and description in a nice readable format.''' self.render_tree_lvl(self.jobs) diff --git a/hshetl/jobs.py b/hshetl/jobs.py index ae95ba1d96005ad2490d297c4bd7c560033e495b..f042024e353dc82f9c8dfe368eb4225e067a0214 100644 --- a/hshetl/jobs.py +++ b/hshetl/jobs.py @@ -42,7 +42,7 @@ import sys from connectors import SqlQueryLanguageSupportMixin, SqlAlchemyConnector, connector_repository from extractors import AbstractExtractor from transformers import CompareTransformer, AbstractTransformer -from loaders import AbstractLoader +from loaders import AbstractLoader, CsvLoader from entities import entity_repository, Container, Result, COLLISION_HANDLING_BREAKALL from exc import ConfigurationException, JobException @@ -84,7 +84,7 @@ class Job(object): description = u'A Job that does nothing' '''The default description of the job.''' - def __init__(self, name = None, description = None, interactive = False): + def __init__(self, name = None, description = None, interactive = False, dry = False): '''Create a job instance. Parameters @@ -102,6 +102,8 @@ class Job(object): '''The description of the job.''' self.interactive = interactive '''Indicates whether the job will prompt before its execution, or not.''' + self.dry = dry + '''Indicates whether the job will actually execute (False) load queries or not (True)''' self.executed = False '''Indicates if the job was executed''' @@ -657,7 +659,7 @@ class LoadJob(EntityJob, ConnectorJob): '''The default description of this job.''' def __init__(self, loader, source, *args, **kwargs): - '''The constructor for extraction jobs.''' + '''The constructor for load jobs.''' super(LoadJob, self).__init__(*args, **kwargs) if not isinstance(loader, AbstractLoader): raise ConfigurationException('The loader needs to be an instance of hshetl.loaders.AbstractLoader. Encoutered: {}'.format(str(type(loader)))) @@ -672,7 +674,10 @@ class LoadJob(EntityJob, ConnectorJob): '''Load data from the repository into a storage interface.''' result = Result(entity = self.entity, source = self.source) logging.info(unicode(result)) - self.loader.execute(result) + if isinstance(self.loader, CsvLoader): + self.loader.execute(result) + else: + self.loader.execute(result, dry = self.dry) @yamlify diff --git a/hshetl/loaders.py b/hshetl/loaders.py index 204d6b87b8136cf11848c595270f6c765fd65855..11fc0a988d4123ac41f405d9bf6b1aa2d13ccca0 100644 --- a/hshetl/loaders.py +++ b/hshetl/loaders.py @@ -72,7 +72,7 @@ class AbstractLoader(object): ''' return isinstance(connector, AbstractConnector) - def execute(self, result): + def execute(self, result, dry = False): '''Executes the loader. :param hshetl.entities.Result result: @@ -82,10 +82,10 @@ class AbstractLoader(object): if not self.can_execute(self.connector): raise LoaderException('The loader {} can not be executed using this connector {}'.format(self, self.connector)) logging.debug('Loads data: ' + self.__class__.__name__) - self._execute(result) + self._execute(result, dry) logging.debug('Load successfully finished') - def _execute(self, result): + def _execute(self, result, dry = False): '''Executes the loading of data. Distinguishes between update, insert and delete.''' for action in self.operations: getattr(self, '_' + action)(getattr(result, action)) @@ -214,7 +214,7 @@ class SqlAlchemyLoader(AbstractLoader): '''Defines which connector can be handled by this extractor.''' return isinstance(connector, SqlAlchemyConnector) - def _execute(self, result): + def _execute(self, result, dry = False): '''Executes the loading of data. Distinguishes between update, insert and delete''' logging.debug('Will execute the following operations: %s' % self.operations) self.table = Table(self.table_name, @@ -224,7 +224,7 @@ class SqlAlchemyLoader(AbstractLoader): logging.info('Successfully reflected table ' + self.table_name) with self.connector as connection: for action in self.operations: - getattr(self, '_' + action)(connection, getattr(result, action)) + getattr(self, '_' + action)(connection, getattr(result, action), dry) logging.info('%s data sets inserted (%s), %s data sets updated (%s) , %s data sets deleted (%s).' % (str(len(result.insert)), 'Done!' if 'insert' in self.operations else 'Skipped!', str(len(result.update)), @@ -232,32 +232,41 @@ class SqlAlchemyLoader(AbstractLoader): str(len(result.delete)), 'Done!' if 'delete' in self.operations else 'Skipped!')) - def _insert(self, connection, data): + def _insert(self, connection, data, dry): '''Creates a sql insert statement using sqlalchemy and executes it on the connection.''' if len(data) == 0: return values = [] for record in data: values.append(record.to_dict(remove_system_identifiers = False)) statement = self.table.insert() - logging.info('Executing {} with {} '.format(str(statement), str(values))) - connection.execute(statement, values) + if dry: + logging.info('Sql insert query that would be executed: ' + self._generate_human_readable_query(statement, record)) + else: + logging.info('Executing sql insert query: ' + self._generate_human_readable_query(statement, record)) + connection.execute(statement, values) - def _update(self, connection, data): + def _update(self, connection, data, dry): '''Creates a sql update statement using sqlalchemy and executes it on the connection.''' for record in data: statement = self.table.update(values = record.to_dict()) statement = self._add_where_constraints(statement, record) - logging.info('Executing sql update query: ' + str(statement)) - connection.execute(statement) + if dry: + logging.info('Sql update query that would be executed: ' + self._generate_human_readable_query(statement, record)) + else: + logging.info('Executing sql update query: ' + self._generate_human_readable_query(statement, record)) + connection.execute(statement) - def _delete(self, connection, data): + def _delete(self, connection, data, dry): '''Creates a sql delete statement using sqlalchemy and executes it on the connection.''' for record in data: statement = self.table.delete() statement = self._add_where_constraints(statement, record) - logging.info('Executing sql delete query: ' + str(statement)) - connection.execute(statement) + if dry: + logging.info('Sql delete query that would be executed: ' + self._generate_human_readable_query(statement, record.to_dict())) + else: + logging.info('Executing sql delete query: ' + self._generate_human_readable_query(statement, record)) + connection.execute(statement) def _add_where_constraints(self, statement, record): '''Resolves the identifiers for the record and adds the column = value constraints. @@ -276,6 +285,28 @@ class SqlAlchemyLoader(AbstractLoader): raise ConfigurationException('Value for identifier \'' + str(column) + '\' empty.') return statement + def _generate_human_readable_query(self, statement, record): + '''Renders a human readable (normal) sql query from the given statement and record values.''' + query = str(statement) + values = record.to_dict(remove_system_identifiers = False) + prop_types = record.properties + for column in values.keys(): + try: + prop_types[column] + except KeyError: + prop_types[column] = 'unknown because mapped' + if prop_types[column] == 'string': + replace_value = '\'' + unicode(values[column]) + '\'' + elif prop_types[column] == 'binary': + replace_value = '<binary_data>' + else: + replace_value = unicode(values[column]) + if ':' + column + '_1' in query: + query = query.replace(':' + column + '_1', replace_value) + else: + query = query.replace(':' + column, replace_value) + return query + @yamlify class LdapLoader(AbstractLoader): @@ -322,14 +353,14 @@ class LdapLoader(AbstractLoader): '''Defines which connector can be handled by this extractor.''' return isinstance(connector, LdapConnector) - def _execute(self, result): + def _execute(self, result, dry): '''Executes the loading of data. Distinguishes between update, insert and delete.''' logging.debug('Loads data: ' + self.__class__.__name__) if None in (self.rdn, self.base, self.objectClass): raise ConfigurationException('base, rdn or objectClass not found - needs to be configured in the connector or loader!') with self.connector as connection: for action in self.operations: - getattr(self, '_' + action)(connection, getattr(result, action)) + getattr(self, '_' + action)(connection, getattr(result, action), dry) logging.info('%s data sets inserted (%s), %s data sets updated (%s) , %s data sets deleted (%s).' % (str(len(result.insert)), 'Done!' if 'insert' in self.operations else 'Skipped!', str(len(result.update)), @@ -337,7 +368,7 @@ class LdapLoader(AbstractLoader): str(len(result.delete)), 'Done!' if 'delete' in self.operations else 'Skipped!')) - def _update(self, connection, data): + def _update(self, connection, data, dry): '''Updates every dataset one after another.''' for record in data: old_entity_dict = self._prepare_data(record.to_dict(container = record.last_container.target_container)) @@ -346,30 +377,39 @@ class LdapLoader(AbstractLoader): dn = self._get_dn(record.get_container_identifier(), dict_data) attributes = modlist.modifyModlist(old_entity_dict, entity_dict) try: - connection.modify_s(dn, attributes) - logging.info('Modified dn {} with {} '.format(dn, str(attributes))) + if dry: + logging.info('Would modify dn {} with {} '.format(dn, str(attributes))) + else: + connection.modify_s(dn, attributes) + logging.info('Modified dn {} with {} '.format(dn, str(attributes))) except Exception, e: logging.warn('Update failed for dn \'' + dn + '\': ' + str(e)) - def _insert(self, connection, data): + def _insert(self, connection, data, dry): '''Inserts every dataset one after another.''' for record in data: entity_dict = self._prepare_data(record.to_dict()) dn = self._get_dn(record.get_container_identifier(), record.to_dict()) attributes = modlist.addModlist(entity_dict) try: - connection.add_s(dn, attributes) - logging.info('Created dn {} with {} '.format(dn, str(attributes))) + if dry: + logging.info('Would create dn {} with {} '.format(dn, str(attributes))) + else: + connection.add_s(dn, attributes) + logging.info('Created dn {} with {} '.format(dn, str(attributes))) except Exception, e: logging.warn('Insert failed for dn \'' + dn + '\': ' + str(e)) - def _delete(self, connection, data): + def _delete(self, connection, data, dry): '''Deletes every entry one after another.''' for record in data: dn = self._get_dn(record.get_container_identifier(), record.to_dict()) try: - connection.delete_s(dn) - logging.info('Deleted dn {}'.format(dn)) + if dry: + logging.info('Would delete dn {}'.format(dn)) + else: + connection.delete_s(dn) + logging.info('Deleted dn {}'.format(dn)) except Exception, e: logging.warn('Delete failed for dn \'' + dn + '\': ' + str(e))