diff --git a/hshetl/jobs.py b/hshetl/jobs.py index f042024e353dc82f9c8dfe368eb4225e067a0214..72a4b78116d3a7c0d10567fa2af5470fac1e022c 100644 --- a/hshetl/jobs.py +++ b/hshetl/jobs.py @@ -674,10 +674,8 @@ class LoadJob(EntityJob, ConnectorJob): '''Load data from the repository into a storage interface.''' result = Result(entity = self.entity, source = self.source) logging.info(unicode(result)) - if isinstance(self.loader, CsvLoader): - self.loader.execute(result) - else: - self.loader.execute(result, dry = self.dry) + self.loader.dry = self.dry + self.loader.execute(result) @yamlify diff --git a/hshetl/loaders.py b/hshetl/loaders.py index 11fc0a988d4123ac41f405d9bf6b1aa2d13ccca0..6324758ce74ec97fb45a8bb2790f3fcdb5065400 100644 --- a/hshetl/loaders.py +++ b/hshetl/loaders.py @@ -52,7 +52,7 @@ class AbstractLoader(object): ''' @NameResolver(connector_repository, 'connector', add = True) - def __init__(self, connector = None, operations = ['insert', 'update', 'delete']): + def __init__(self, connector = None, operations = ['insert', 'update', 'delete'], dry = False): '''Initializes the loader.''' logging.debug('Initializing loader: ' + self.__class__.__name__) if connector is not None and not self.can_execute(connector): @@ -61,6 +61,7 @@ class AbstractLoader(object): '''The connector this loader uses to write the data.''' self.operations = operations '''The configuration that was used to create the loader.''' + self.dry = dry def can_execute(self, connector): '''Return whether this loader can handle the connector. @@ -72,7 +73,7 @@ class AbstractLoader(object): ''' return isinstance(connector, AbstractConnector) - def execute(self, result, dry = False): + def execute(self, result): '''Executes the loader. :param hshetl.entities.Result result: @@ -82,10 +83,10 @@ class AbstractLoader(object): if not self.can_execute(self.connector): raise LoaderException('The loader {} can not be executed using this connector {}'.format(self, self.connector)) logging.debug('Loads data: ' + self.__class__.__name__) - self._execute(result, dry) + self._execute(result) logging.debug('Load successfully finished') - def _execute(self, result, dry = False): + def _execute(self, result): '''Executes the loading of data. Distinguishes between update, insert and delete.''' for action in self.operations: getattr(self, '_' + action)(getattr(result, action)) @@ -162,8 +163,11 @@ class CsvLoader(AbstractLoader): line.append(u'|'.join(map(lambda x: str(x), prop))) else: line.append(unicode(prop)) - logging.debug('Writing line: ' + str(line)) - writer.writerow(line) + if self.dry: + logging.debug('Would write line: ' + str(line)) + else: + logging.debug('Writing line: ' + str(line)) + writer.writerow(line) def _delete(self, data = []): '''This loader can not delete records in the target. Therefore only empty data is allowed''' @@ -214,7 +218,7 @@ class SqlAlchemyLoader(AbstractLoader): '''Defines which connector can be handled by this extractor.''' return isinstance(connector, SqlAlchemyConnector) - def _execute(self, result, dry = False): + def _execute(self, result): '''Executes the loading of data. Distinguishes between update, insert and delete''' logging.debug('Will execute the following operations: %s' % self.operations) self.table = Table(self.table_name, @@ -224,7 +228,7 @@ class SqlAlchemyLoader(AbstractLoader): logging.info('Successfully reflected table ' + self.table_name) with self.connector as connection: for action in self.operations: - getattr(self, '_' + action)(connection, getattr(result, action), dry) + getattr(self, '_' + action)(connection, getattr(result, action)) logging.info('%s data sets inserted (%s), %s data sets updated (%s) , %s data sets deleted (%s).' % (str(len(result.insert)), 'Done!' if 'insert' in self.operations else 'Skipped!', str(len(result.update)), @@ -232,40 +236,40 @@ class SqlAlchemyLoader(AbstractLoader): str(len(result.delete)), 'Done!' if 'delete' in self.operations else 'Skipped!')) - def _insert(self, connection, data, dry): + def _insert(self, connection, data): '''Creates a sql insert statement using sqlalchemy and executes it on the connection.''' if len(data) == 0: return values = [] for record in data: values.append(record.to_dict(remove_system_identifiers = False)) statement = self.table.insert() - if dry: - logging.info('Sql insert query that would be executed: ' + self._generate_human_readable_query(statement, record)) + if self.dry: + logging.info('Sql insert query that would be executed: ' + self._generate_readable_query(statement, record)) else: - logging.info('Executing sql insert query: ' + self._generate_human_readable_query(statement, record)) + logging.info('Executing sql insert query: ' + self._generate_readable_query(statement, record)) connection.execute(statement, values) - def _update(self, connection, data, dry): + def _update(self, connection, data): '''Creates a sql update statement using sqlalchemy and executes it on the connection.''' for record in data: statement = self.table.update(values = record.to_dict()) statement = self._add_where_constraints(statement, record) - if dry: - logging.info('Sql update query that would be executed: ' + self._generate_human_readable_query(statement, record)) + if self.dry: + logging.info('Sql update query that would be executed: ' + self._generate_readable_query(statement, record)) else: - logging.info('Executing sql update query: ' + self._generate_human_readable_query(statement, record)) + logging.info('Executing sql update query: ' + self._generate_readable_query(statement, record)) connection.execute(statement) - def _delete(self, connection, data, dry): + def _delete(self, connection, data): '''Creates a sql delete statement using sqlalchemy and executes it on the connection.''' for record in data: statement = self.table.delete() statement = self._add_where_constraints(statement, record) - if dry: - logging.info('Sql delete query that would be executed: ' + self._generate_human_readable_query(statement, record.to_dict())) + if self.dry: + logging.info('Sql delete query that would be executed: ' + self._generate_readable_query(statement, record.to_dict())) else: - logging.info('Executing sql delete query: ' + self._generate_human_readable_query(statement, record)) + logging.info('Executing sql delete query: ' + self._generate_readable_query(statement, record)) connection.execute(statement) def _add_where_constraints(self, statement, record): @@ -285,7 +289,7 @@ class SqlAlchemyLoader(AbstractLoader): raise ConfigurationException('Value for identifier \'' + str(column) + '\' empty.') return statement - def _generate_human_readable_query(self, statement, record): + def _generate_readable_query(self, statement, record): '''Renders a human readable (normal) sql query from the given statement and record values.''' query = str(statement) values = record.to_dict(remove_system_identifiers = False) @@ -353,14 +357,14 @@ class LdapLoader(AbstractLoader): '''Defines which connector can be handled by this extractor.''' return isinstance(connector, LdapConnector) - def _execute(self, result, dry): + def _execute(self, result): '''Executes the loading of data. Distinguishes between update, insert and delete.''' logging.debug('Loads data: ' + self.__class__.__name__) if None in (self.rdn, self.base, self.objectClass): raise ConfigurationException('base, rdn or objectClass not found - needs to be configured in the connector or loader!') with self.connector as connection: for action in self.operations: - getattr(self, '_' + action)(connection, getattr(result, action), dry) + getattr(self, '_' + action)(connection, getattr(result, action)) logging.info('%s data sets inserted (%s), %s data sets updated (%s) , %s data sets deleted (%s).' % (str(len(result.insert)), 'Done!' if 'insert' in self.operations else 'Skipped!', str(len(result.update)), @@ -368,7 +372,7 @@ class LdapLoader(AbstractLoader): str(len(result.delete)), 'Done!' if 'delete' in self.operations else 'Skipped!')) - def _update(self, connection, data, dry): + def _update(self, connection, data): '''Updates every dataset one after another.''' for record in data: old_entity_dict = self._prepare_data(record.to_dict(container = record.last_container.target_container)) @@ -377,7 +381,7 @@ class LdapLoader(AbstractLoader): dn = self._get_dn(record.get_container_identifier(), dict_data) attributes = modlist.modifyModlist(old_entity_dict, entity_dict) try: - if dry: + if self.dry: logging.info('Would modify dn {} with {} '.format(dn, str(attributes))) else: connection.modify_s(dn, attributes) @@ -385,14 +389,14 @@ class LdapLoader(AbstractLoader): except Exception, e: logging.warn('Update failed for dn \'' + dn + '\': ' + str(e)) - def _insert(self, connection, data, dry): + def _insert(self, connection, data): '''Inserts every dataset one after another.''' for record in data: entity_dict = self._prepare_data(record.to_dict()) dn = self._get_dn(record.get_container_identifier(), record.to_dict()) attributes = modlist.addModlist(entity_dict) try: - if dry: + if self.dry: logging.info('Would create dn {} with {} '.format(dn, str(attributes))) else: connection.add_s(dn, attributes) @@ -400,12 +404,12 @@ class LdapLoader(AbstractLoader): except Exception, e: logging.warn('Insert failed for dn \'' + dn + '\': ' + str(e)) - def _delete(self, connection, data, dry): + def _delete(self, connection, data): '''Deletes every entry one after another.''' for record in data: dn = self._get_dn(record.get_container_identifier(), record.to_dict()) try: - if dry: + if self.dry: logging.info('Would delete dn {}'.format(dn)) else: connection.delete_s(dn)