Skip to content
Snippets Groups Projects
Commit c3a183e4 authored by schulmax's avatar schulmax
Browse files

[TASK] Introduces dry run mode

You can now run hshetl with the --dry parameter to not load
anything into the target. Only the queries that would
be executed will be shown in the log.
Dry run mode per job coming later (atm all jobs dry or none).
parent 2a55fa56
Branches
No related tags found
No related merge requests found
...@@ -8,3 +8,4 @@ dist/ ...@@ -8,3 +8,4 @@ dist/
*.asta.bak *.asta.bak
*.asta.lock *.asta.lock
.metadata/ .metadata/
distribute-0.7.3-py2.7.egg
...@@ -104,6 +104,7 @@ def _argparse(config_file = None, working_directory = None): ...@@ -104,6 +104,7 @@ def _argparse(config_file = None, working_directory = None):
parser.add_argument('-i', '--interactive', action = 'store_true', help = 'Ask before each job execution.') parser.add_argument('-i', '--interactive', action = 'store_true', help = 'Ask before each job execution.')
parser.add_argument('-l', '--logformat', dest = 'logformat', help = 'The format of the logging output.', default = '%(levelname)s: %(message)s') parser.add_argument('-l', '--logformat', dest = 'logformat', help = 'The format of the logging output.', default = '%(levelname)s: %(message)s')
parser.add_argument('-p', '--profile', action = 'store_true') parser.add_argument('-p', '--profile', action = 'store_true')
parser.add_argument('--dry', action = 'store_true', help = 'Build all queries and extract the data but do NOT write (load)')
return parser return parser
...@@ -136,6 +137,7 @@ class Controller(object): ...@@ -136,6 +137,7 @@ class Controller(object):
self.explain = args.explain self.explain = args.explain
self.interactive = args.interactive self.interactive = args.interactive
self.profile = args.profile self.profile = args.profile
self.dry = args.dry
logging.basicConfig(stream = sys.stdout, level = logging.DEBUG if args.verbose else logging.INFO, format = args.logformat) logging.basicConfig(stream = sys.stdout, level = logging.DEBUG if args.verbose else logging.INFO, format = args.logformat)
logging.debug('I am chatty now! Lets shovel data :)') logging.debug('I am chatty now! Lets shovel data :)')
...@@ -180,6 +182,8 @@ class Controller(object): ...@@ -180,6 +182,8 @@ class Controller(object):
'''Build job objects from configuration and execute them.''' '''Build job objects from configuration and execute them.'''
if self.interactive: if self.interactive:
self.make_interactive(self.jobs) self.make_interactive(self.jobs)
if self.dry:
self.make_dry(self.jobs)
try: try:
if self.explain: if self.explain:
self.explain_jobs() self.explain_jobs()
...@@ -219,6 +223,14 @@ class Controller(object): ...@@ -219,6 +223,14 @@ class Controller(object):
if isinstance(job, JobList): if isinstance(job, JobList):
self.make_interactive(job) self.make_interactive(job)
def make_dry(self, jobs):
'''Applies dry-run mode to all jobs'''
for job in jobs:
logging.debug('Mark job as dry-run...')
job.dry = True
if isinstance(job, JobList):
self.make_dry(job)
def explain_jobs(self): def explain_jobs(self):
'''Traverse the job tree and print out jobs name and description in a nice readable format.''' '''Traverse the job tree and print out jobs name and description in a nice readable format.'''
self.render_tree_lvl(self.jobs) self.render_tree_lvl(self.jobs)
......
...@@ -42,7 +42,7 @@ import sys ...@@ -42,7 +42,7 @@ import sys
from connectors import SqlQueryLanguageSupportMixin, SqlAlchemyConnector, connector_repository from connectors import SqlQueryLanguageSupportMixin, SqlAlchemyConnector, connector_repository
from extractors import AbstractExtractor from extractors import AbstractExtractor
from transformers import CompareTransformer, AbstractTransformer from transformers import CompareTransformer, AbstractTransformer
from loaders import AbstractLoader from loaders import AbstractLoader, CsvLoader
from entities import entity_repository, Container, Result, COLLISION_HANDLING_BREAKALL from entities import entity_repository, Container, Result, COLLISION_HANDLING_BREAKALL
from exc import ConfigurationException, JobException from exc import ConfigurationException, JobException
...@@ -84,7 +84,7 @@ class Job(object): ...@@ -84,7 +84,7 @@ class Job(object):
description = u'A Job that does nothing' description = u'A Job that does nothing'
'''The default description of the job.''' '''The default description of the job.'''
def __init__(self, name = None, description = None, interactive = False): def __init__(self, name = None, description = None, interactive = False, dry = False):
'''Create a job instance. '''Create a job instance.
Parameters Parameters
...@@ -102,6 +102,8 @@ class Job(object): ...@@ -102,6 +102,8 @@ class Job(object):
'''The description of the job.''' '''The description of the job.'''
self.interactive = interactive self.interactive = interactive
'''Indicates whether the job will prompt before its execution, or not.''' '''Indicates whether the job will prompt before its execution, or not.'''
self.dry = dry
'''Indicates whether the job will actually execute (False) load queries or not (True)'''
self.executed = False self.executed = False
'''Indicates if the job was executed''' '''Indicates if the job was executed'''
...@@ -657,7 +659,7 @@ class LoadJob(EntityJob, ConnectorJob): ...@@ -657,7 +659,7 @@ class LoadJob(EntityJob, ConnectorJob):
'''The default description of this job.''' '''The default description of this job.'''
def __init__(self, loader, source, *args, **kwargs): def __init__(self, loader, source, *args, **kwargs):
'''The constructor for extraction jobs.''' '''The constructor for load jobs.'''
super(LoadJob, self).__init__(*args, **kwargs) super(LoadJob, self).__init__(*args, **kwargs)
if not isinstance(loader, AbstractLoader): if not isinstance(loader, AbstractLoader):
raise ConfigurationException('The loader needs to be an instance of hshetl.loaders.AbstractLoader. Encoutered: {}'.format(str(type(loader)))) raise ConfigurationException('The loader needs to be an instance of hshetl.loaders.AbstractLoader. Encoutered: {}'.format(str(type(loader))))
...@@ -672,7 +674,10 @@ class LoadJob(EntityJob, ConnectorJob): ...@@ -672,7 +674,10 @@ class LoadJob(EntityJob, ConnectorJob):
'''Load data from the repository into a storage interface.''' '''Load data from the repository into a storage interface.'''
result = Result(entity = self.entity, source = self.source) result = Result(entity = self.entity, source = self.source)
logging.info(unicode(result)) logging.info(unicode(result))
if isinstance(self.loader, CsvLoader):
self.loader.execute(result) self.loader.execute(result)
else:
self.loader.execute(result, dry = self.dry)
@yamlify @yamlify
......
...@@ -72,7 +72,7 @@ class AbstractLoader(object): ...@@ -72,7 +72,7 @@ class AbstractLoader(object):
''' '''
return isinstance(connector, AbstractConnector) return isinstance(connector, AbstractConnector)
def execute(self, result): def execute(self, result, dry = False):
'''Executes the loader. '''Executes the loader.
:param hshetl.entities.Result result: :param hshetl.entities.Result result:
...@@ -82,10 +82,10 @@ class AbstractLoader(object): ...@@ -82,10 +82,10 @@ class AbstractLoader(object):
if not self.can_execute(self.connector): if not self.can_execute(self.connector):
raise LoaderException('The loader {} can not be executed using this connector {}'.format(self, self.connector)) raise LoaderException('The loader {} can not be executed using this connector {}'.format(self, self.connector))
logging.debug('Loads data: ' + self.__class__.__name__) logging.debug('Loads data: ' + self.__class__.__name__)
self._execute(result) self._execute(result, dry)
logging.debug('Load successfully finished') logging.debug('Load successfully finished')
def _execute(self, result): def _execute(self, result, dry = False):
'''Executes the loading of data. Distinguishes between update, insert and delete.''' '''Executes the loading of data. Distinguishes between update, insert and delete.'''
for action in self.operations: for action in self.operations:
getattr(self, '_' + action)(getattr(result, action)) getattr(self, '_' + action)(getattr(result, action))
...@@ -214,7 +214,7 @@ class SqlAlchemyLoader(AbstractLoader): ...@@ -214,7 +214,7 @@ class SqlAlchemyLoader(AbstractLoader):
'''Defines which connector can be handled by this extractor.''' '''Defines which connector can be handled by this extractor.'''
return isinstance(connector, SqlAlchemyConnector) return isinstance(connector, SqlAlchemyConnector)
def _execute(self, result): def _execute(self, result, dry = False):
'''Executes the loading of data. Distinguishes between update, insert and delete''' '''Executes the loading of data. Distinguishes between update, insert and delete'''
logging.debug('Will execute the following operations: %s' % self.operations) logging.debug('Will execute the following operations: %s' % self.operations)
self.table = Table(self.table_name, self.table = Table(self.table_name,
...@@ -224,7 +224,7 @@ class SqlAlchemyLoader(AbstractLoader): ...@@ -224,7 +224,7 @@ class SqlAlchemyLoader(AbstractLoader):
logging.info('Successfully reflected table ' + self.table_name) logging.info('Successfully reflected table ' + self.table_name)
with self.connector as connection: with self.connector as connection:
for action in self.operations: for action in self.operations:
getattr(self, '_' + action)(connection, getattr(result, action)) getattr(self, '_' + action)(connection, getattr(result, action), dry)
logging.info('%s data sets inserted (%s), %s data sets updated (%s) , %s data sets deleted (%s).' % (str(len(result.insert)), logging.info('%s data sets inserted (%s), %s data sets updated (%s) , %s data sets deleted (%s).' % (str(len(result.insert)),
'Done!' if 'insert' in self.operations else 'Skipped!', 'Done!' if 'insert' in self.operations else 'Skipped!',
str(len(result.update)), str(len(result.update)),
...@@ -232,31 +232,40 @@ class SqlAlchemyLoader(AbstractLoader): ...@@ -232,31 +232,40 @@ class SqlAlchemyLoader(AbstractLoader):
str(len(result.delete)), str(len(result.delete)),
'Done!' if 'delete' in self.operations else 'Skipped!')) 'Done!' if 'delete' in self.operations else 'Skipped!'))
def _insert(self, connection, data): def _insert(self, connection, data, dry):
'''Creates a sql insert statement using sqlalchemy and executes it on the connection.''' '''Creates a sql insert statement using sqlalchemy and executes it on the connection.'''
if len(data) == 0: return if len(data) == 0: return
values = [] values = []
for record in data: for record in data:
values.append(record.to_dict(remove_system_identifiers = False)) values.append(record.to_dict(remove_system_identifiers = False))
statement = self.table.insert() statement = self.table.insert()
logging.info('Executing {} with {} '.format(str(statement), str(values))) if dry:
logging.info('Sql insert query that would be executed: ' + self._generate_human_readable_query(statement, record))
else:
logging.info('Executing sql insert query: ' + self._generate_human_readable_query(statement, record))
connection.execute(statement, values) connection.execute(statement, values)
def _update(self, connection, data): def _update(self, connection, data, dry):
'''Creates a sql update statement using sqlalchemy and executes it on the connection.''' '''Creates a sql update statement using sqlalchemy and executes it on the connection.'''
for record in data: for record in data:
statement = self.table.update(values = record.to_dict()) statement = self.table.update(values = record.to_dict())
statement = self._add_where_constraints(statement, record) statement = self._add_where_constraints(statement, record)
logging.info('Executing sql update query: ' + str(statement)) if dry:
logging.info('Sql update query that would be executed: ' + self._generate_human_readable_query(statement, record))
else:
logging.info('Executing sql update query: ' + self._generate_human_readable_query(statement, record))
connection.execute(statement) connection.execute(statement)
def _delete(self, connection, data): def _delete(self, connection, data, dry):
'''Creates a sql delete statement using sqlalchemy and executes it on the connection.''' '''Creates a sql delete statement using sqlalchemy and executes it on the connection.'''
for record in data: for record in data:
statement = self.table.delete() statement = self.table.delete()
statement = self._add_where_constraints(statement, record) statement = self._add_where_constraints(statement, record)
logging.info('Executing sql delete query: ' + str(statement)) if dry:
logging.info('Sql delete query that would be executed: ' + self._generate_human_readable_query(statement, record.to_dict()))
else:
logging.info('Executing sql delete query: ' + self._generate_human_readable_query(statement, record))
connection.execute(statement) connection.execute(statement)
def _add_where_constraints(self, statement, record): def _add_where_constraints(self, statement, record):
...@@ -276,6 +285,28 @@ class SqlAlchemyLoader(AbstractLoader): ...@@ -276,6 +285,28 @@ class SqlAlchemyLoader(AbstractLoader):
raise ConfigurationException('Value for identifier \'' + str(column) + '\' empty.') raise ConfigurationException('Value for identifier \'' + str(column) + '\' empty.')
return statement return statement
def _generate_human_readable_query(self, statement, record):
'''Renders a human readable (normal) sql query from the given statement and record values.'''
query = str(statement)
values = record.to_dict(remove_system_identifiers = False)
prop_types = record.properties
for column in values.keys():
try:
prop_types[column]
except KeyError:
prop_types[column] = 'unknown because mapped'
if prop_types[column] == 'string':
replace_value = '\'' + unicode(values[column]) + '\''
elif prop_types[column] == 'binary':
replace_value = '<binary_data>'
else:
replace_value = unicode(values[column])
if ':' + column + '_1' in query:
query = query.replace(':' + column + '_1', replace_value)
else:
query = query.replace(':' + column, replace_value)
return query
@yamlify @yamlify
class LdapLoader(AbstractLoader): class LdapLoader(AbstractLoader):
...@@ -322,14 +353,14 @@ class LdapLoader(AbstractLoader): ...@@ -322,14 +353,14 @@ class LdapLoader(AbstractLoader):
'''Defines which connector can be handled by this extractor.''' '''Defines which connector can be handled by this extractor.'''
return isinstance(connector, LdapConnector) return isinstance(connector, LdapConnector)
def _execute(self, result): def _execute(self, result, dry):
'''Executes the loading of data. Distinguishes between update, insert and delete.''' '''Executes the loading of data. Distinguishes between update, insert and delete.'''
logging.debug('Loads data: ' + self.__class__.__name__) logging.debug('Loads data: ' + self.__class__.__name__)
if None in (self.rdn, self.base, self.objectClass): if None in (self.rdn, self.base, self.objectClass):
raise ConfigurationException('base, rdn or objectClass not found - needs to be configured in the connector or loader!') raise ConfigurationException('base, rdn or objectClass not found - needs to be configured in the connector or loader!')
with self.connector as connection: with self.connector as connection:
for action in self.operations: for action in self.operations:
getattr(self, '_' + action)(connection, getattr(result, action)) getattr(self, '_' + action)(connection, getattr(result, action), dry)
logging.info('%s data sets inserted (%s), %s data sets updated (%s) , %s data sets deleted (%s).' % (str(len(result.insert)), logging.info('%s data sets inserted (%s), %s data sets updated (%s) , %s data sets deleted (%s).' % (str(len(result.insert)),
'Done!' if 'insert' in self.operations else 'Skipped!', 'Done!' if 'insert' in self.operations else 'Skipped!',
str(len(result.update)), str(len(result.update)),
...@@ -337,7 +368,7 @@ class LdapLoader(AbstractLoader): ...@@ -337,7 +368,7 @@ class LdapLoader(AbstractLoader):
str(len(result.delete)), str(len(result.delete)),
'Done!' if 'delete' in self.operations else 'Skipped!')) 'Done!' if 'delete' in self.operations else 'Skipped!'))
def _update(self, connection, data): def _update(self, connection, data, dry):
'''Updates every dataset one after another.''' '''Updates every dataset one after another.'''
for record in data: for record in data:
old_entity_dict = self._prepare_data(record.to_dict(container = record.last_container.target_container)) old_entity_dict = self._prepare_data(record.to_dict(container = record.last_container.target_container))
...@@ -346,28 +377,37 @@ class LdapLoader(AbstractLoader): ...@@ -346,28 +377,37 @@ class LdapLoader(AbstractLoader):
dn = self._get_dn(record.get_container_identifier(), dict_data) dn = self._get_dn(record.get_container_identifier(), dict_data)
attributes = modlist.modifyModlist(old_entity_dict, entity_dict) attributes = modlist.modifyModlist(old_entity_dict, entity_dict)
try: try:
if dry:
logging.info('Would modify dn {} with {} '.format(dn, str(attributes)))
else:
connection.modify_s(dn, attributes) connection.modify_s(dn, attributes)
logging.info('Modified dn {} with {} '.format(dn, str(attributes))) logging.info('Modified dn {} with {} '.format(dn, str(attributes)))
except Exception, e: except Exception, e:
logging.warn('Update failed for dn \'' + dn + '\': ' + str(e)) logging.warn('Update failed for dn \'' + dn + '\': ' + str(e))
def _insert(self, connection, data): def _insert(self, connection, data, dry):
'''Inserts every dataset one after another.''' '''Inserts every dataset one after another.'''
for record in data: for record in data:
entity_dict = self._prepare_data(record.to_dict()) entity_dict = self._prepare_data(record.to_dict())
dn = self._get_dn(record.get_container_identifier(), record.to_dict()) dn = self._get_dn(record.get_container_identifier(), record.to_dict())
attributes = modlist.addModlist(entity_dict) attributes = modlist.addModlist(entity_dict)
try: try:
if dry:
logging.info('Would create dn {} with {} '.format(dn, str(attributes)))
else:
connection.add_s(dn, attributes) connection.add_s(dn, attributes)
logging.info('Created dn {} with {} '.format(dn, str(attributes))) logging.info('Created dn {} with {} '.format(dn, str(attributes)))
except Exception, e: except Exception, e:
logging.warn('Insert failed for dn \'' + dn + '\': ' + str(e)) logging.warn('Insert failed for dn \'' + dn + '\': ' + str(e))
def _delete(self, connection, data): def _delete(self, connection, data, dry):
'''Deletes every entry one after another.''' '''Deletes every entry one after another.'''
for record in data: for record in data:
dn = self._get_dn(record.get_container_identifier(), record.to_dict()) dn = self._get_dn(record.get_container_identifier(), record.to_dict())
try: try:
if dry:
logging.info('Would delete dn {}'.format(dn))
else:
connection.delete_s(dn) connection.delete_s(dn)
logging.info('Deleted dn {}'.format(dn)) logging.info('Deleted dn {}'.format(dn))
except Exception, e: except Exception, e:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment