diff --git a/hshetl/cli.py b/hshetl/cli.py
index 3e5bb02b97847db2bcc34d762c514caea81ec614..31ecf1808388944bb461d89347e300a273082131 100644
--- a/hshetl/cli.py
+++ b/hshetl/cli.py
@@ -163,28 +163,15 @@ class Controller(object):
'''Resolves the name of the pid file related to this config.'''
return self.abs_config_file + '.pid'
- def configure(self):
- '''Reads the yaml configuration from file and sets internal variables.'''
- try:
- self.check_pid_file()
- self.jobs.extend(self._configuration_parser.parse(self.abs_config_file))
- except IOError as ioe:
- sys.exit('Can\'t read configuration file: {}'.format(str(ioe)))
- except yaml.parser.ParserError as pe:
- sys.exit('Error while parsing YAML configuration: {}'.format(str(pe)))
- except ConfigurationException as ce:
- sys.exit('Error in configuration: {}'.format(str(ce)))
- except DuplicatedProcessException as dpe:
- sys.exit('Error on execution: {}'.format(str(dpe)))
- return self
-
def execute(self):
'''Build job objects from configuration and execute them.'''
- if self.interactive:
- self.make_interactive(self.jobs)
- if self.dry:
- self.make_dry(self.jobs)
try:
+ self.check_pid_file()
+ self.jobs.extend(self._configuration_parser.parse(self.abs_config_file))
+ if self.interactive:
+ self.make_interactive(self.jobs)
+ if self.dry:
+ self.make_dry(self.jobs)
if self.explain:
self.explain_jobs()
else:
@@ -195,6 +182,14 @@ class Controller(object):
self.jobs.execute()
except KeyboardInterrupt:
logging.critical('The execution was manually interrupted.')
+ except DuplicatedProcessException as dpe:
+ logging.critical('Error on execution: {}'.format(str(dpe)))
+ except IOError as ioe:
+ logging.critical('Can\'t read configuration file: {}'.format(str(ioe)))
+ except yaml.parser.ParserError as pe:
+ logging.critical('Error while parsing YAML configuration: {}'.format(str(pe)))
+ except ConfigurationException as ce:
+ logging.critical('Error in configuration: {}'.format(str(ce)))
finally:
self.shutdown()
logging.info('The whole execution took {} seconds'.format(str(time.time() - self.start_time)))
@@ -251,7 +246,7 @@ class Controller(object):
shutil.copy(example_config, self.working_directory)
def main():
- Controller().configure().execute()
+ Controller().execute()
if __name__ == "__main__":
main()
diff --git a/hshetl/extractors.py b/hshetl/extractors.py
index a5bcdc05562393e481a2bdc882c150764e00b29b..16a208f22574b60bb9778ca2e6aae81e90645ab8 100644
--- a/hshetl/extractors.py
+++ b/hshetl/extractors.py
@@ -86,7 +86,7 @@ class AbstractExtractor(object):
raise NotImplementedError
def __unicode__(self):
- return u'<AbstractExtractor with connector: %s>' % self.connector.name
+ return u'<AbstractExtractor with connector: %s>' % self.connector.name if self.connector is not None else 'Fake'
@yamlify
diff --git a/hshetl/test/functional/test_copy.py b/hshetl/test/functional/test_copy.py
index b922c52e9fd184dd5a842f6d0723f7b5b5b48c86..9643644e5071af3cd1054ba5db70f2e6e8d6b57e 100644
--- a/hshetl/test/functional/test_copy.py
+++ b/hshetl/test/functional/test_copy.py
@@ -15,5 +15,5 @@ class TestCopyJob(TestBaseClass):
def test_job(self):
- Controller().configure().execute()
+ Controller().execute()
self.assertTrue(filecmp.cmp(self.test_working_directory + 'data/expected_person.csv', self.test_working_directory + 'person.csv'))
diff --git a/hshetl/test/functional/test_copy_faker2file.py b/hshetl/test/functional/test_copy_faker2file.py
index 1ee80ec3a2d72ac147b96eaaa1c056a1b93915c3..b4ff2f5200bc03bb1bb5e74404c28ea36f063cdb 100644
--- a/hshetl/test/functional/test_copy_faker2file.py
+++ b/hshetl/test/functional/test_copy_faker2file.py
@@ -15,5 +15,5 @@ class TestFakerCopyJob(TestBaseClass):
def test_job(self):
- Controller().configure().execute()
+ Controller().execute()
self.assertTrue(filecmp.cmp(self.test_working_directory + 'data/expected_fakeperson.csv', self.test_working_directory + 'fakeperson.csv'))
diff --git a/hshetl/test/functional/test_copy_file2file.py b/hshetl/test/functional/test_copy_file2file.py
index ae7fdaf950f5d39c2343749de3363e46ac5b58de..ccc0236ec1dbbc970de01ce331843c43ca7f37ac 100644
--- a/hshetl/test/functional/test_copy_file2file.py
+++ b/hshetl/test/functional/test_copy_file2file.py
@@ -15,5 +15,5 @@ class TestCopyFileJob(TestBaseClass):
def test_job(self):
- Controller().configure().execute()
+ Controller().execute()
self.assertTrue(filecmp.cmp(self.test_working_directory + 'data/expected_output.csv', self.test_working_directory + 'output.csv'))
diff --git a/hshetl/test/functional/test_eetl_file2sqliteWithMultipleIdentifiers.py b/hshetl/test/functional/test_eetl_file2sqliteWithMultipleIdentifiers.py
index 929c99e7eaee2bbd2284c5c160affa608fe2507f..455f7f1fdd0207acc3e40f9ef17731e2b180d58c 100644
--- a/hshetl/test/functional/test_eetl_file2sqliteWithMultipleIdentifiers.py
+++ b/hshetl/test/functional/test_eetl_file2sqliteWithMultipleIdentifiers.py
@@ -32,7 +32,7 @@ class TestFileToSQLiteWithMultipleIdentifiers(TestBaseClass):
self.cursor.execute('INSERT INTO user (foreign_id,inactive,first_name,name,telephone) VALUES (104, 1, \'Eagan\', \'Hutchinson\', 93829462)')
self.cursor.execute('INSERT INTO user (foreign_id,inactive,first_name,name,telephone) VALUES (108, 1, \'Rahim\', \'Wilson\', 14499323)')
self.conn.commit()
- Controller().configure().execute()
+ Controller().execute()
self.cursor.execute('SELECT * FROM user ORDER BY foreign_id ASC')
for fetched_data_set in self.cursor:
self.assertEqual(fetched_data_set, expected_result.pop(0))
\ No newline at end of file
diff --git a/hshetl/test/functional/test_query_sqlquery2sqlite.py b/hshetl/test/functional/test_query_sqlquery2sqlite.py
index 312c7ab0423a8096407df35ab207011dba010a80..0bcf0f9b1e2e9ba3db26778276c74da551877002 100644
--- a/hshetl/test/functional/test_query_sqlquery2sqlite.py
+++ b/hshetl/test/functional/test_query_sqlquery2sqlite.py
@@ -20,7 +20,7 @@ class TestSqlQueryJob(TestBaseClass):
def test_job_creates_table_and_inserts_data_as_configured(self):
- Controller().configure().execute()
+ Controller().execute()
self.cursor.execute('SELECT * FROM user')
for result in self.cursor:
pass
diff --git a/hshetl/test/functional/test_sync_file2sqlite.py b/hshetl/test/functional/test_sync_file2sqlite.py
index 8303de2b5644fcbe66dde62be4e4ccd0a5399893..494cc1518124c01e4bad96cc3e8e5d27af7bc436 100644
--- a/hshetl/test/functional/test_sync_file2sqlite.py
+++ b/hshetl/test/functional/test_sync_file2sqlite.py
@@ -23,7 +23,7 @@ class TestSyncFileToSQLiteSync(TestBaseClass):
def test_batch_run_inserts_right_amount_of_data_sets(self):
- Controller().configure().execute()
+ Controller().execute()
self.cursor.execute('SELECT * FROM user')
data_set_count = 0
for _ in self.cursor:
@@ -34,7 +34,7 @@ class TestSyncFileToSQLiteSync(TestBaseClass):
csv_file = open(self.test_working_directory + '/data/person.csv', 'w')
csv_file.write('p_num,name,retired,telephone\n100,Hanae Ferguson,1,67466035')
csv_file.close()
- Controller().configure().execute()
+ Controller().execute()
self.cursor.execute('SELECT * FROM user')
self.assertEqual(self.cursor.fetchone(), (1, u'Hanae Ferguson', 1, 100, 67466035))
@@ -45,7 +45,7 @@ class TestSyncFileToSQLiteSync(TestBaseClass):
self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone,id) VALUES(100, 1, \'Hanae Ferguson\', 67466035,20)')
self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone,id) VALUES(101, 1, \'Willi Williams\', 67466035,21)')
self.conn.commit()
- Controller().configure().execute()
+ Controller().execute()
self.cursor.execute('SELECT * FROM user')
self.assertEqual(self.cursor.fetchone(), (20, u'Hanna Ferguson', 0, 100, 12345678))
self.assertEqual(self.cursor.fetchone(), (21, u'Willy Williams', 0, 101, 87654321))
@@ -56,7 +56,7 @@ class TestSyncFileToSQLiteSync(TestBaseClass):
csv_file.close()
self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone,id) VALUES(100, 1, \'Hanae Ferguson\', 67466035,20)')
self.conn.commit()
- Controller().configure().execute()
+ Controller().execute()
self.cursor.execute('SELECT * FROM user')
self.assertEqual(self.cursor.fetchone(), None)
@@ -73,7 +73,7 @@ class TestSyncFileToSQLiteSync(TestBaseClass):
self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone) VALUES(104, 1, \'Eagan Hutchinson\', 93829462)')
self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone) VALUES(108, 1, \'Rahim Wilson\', 14499323)')
self.conn.commit()
- Controller().configure().execute()
+ Controller().execute()
self.cursor.execute('SELECT * FROM user ORDER BY id ASC')
for fetched_data_set in self.cursor:
self.assertEqual(fetched_data_set, expected_result.pop(0))
diff --git a/hshetl/test/unit/test_cli.py b/hshetl/test/unit/test_cli.py
index 705a38ed1f3bf965e566d6a34c42370c56d9efc4..9b641669fbecc3386d0b361233c07e86cd9fff2c 100644
--- a/hshetl/test/unit/test_cli.py
+++ b/hshetl/test/unit/test_cli.py
@@ -254,15 +254,5 @@ class TestController(unittest.TestCase):
chdir_patch.assert_called_once_with('/foo/bar/batz')
self.assertEqual(controller.working_directory, '/foo/bar/batz')
- @patch('os.chdir')
- def test_configure_reads_configuration_and_fetches_objects(self, chdir_patch):
- controller = Controller()
- controller._configuration_parser = Mock(spec = ConfigurationParser)
- job = Mock(spec = jobs.ConnectorJob)
- m_jobs = [job]
- controller._configuration_parser.parse.return_value = m_jobs
- controller.configure()
- self.assertEqual(controller.jobs[0], job)
-
if __name__ == "__main__":
unittest.main()
diff --git a/hshetl/test/unit/test_entities.py b/hshetl/test/unit/test_entities.py
index 4687bd156f5a3a8b01a51c15abfc9fe5bb975441..da22ca780bedf56d92005b219e5801733ec46bcd 100644
--- a/hshetl/test/unit/test_entities.py
+++ b/hshetl/test/unit/test_entities.py
@@ -107,28 +107,44 @@ class ContainerTest(unittest.TestCase):
self.assertEqual(self.container.map(record, True), expected)
def test_container_collision_handling(self):
- record = Mock(spec = Record)
- self.assertRaises(DuplicatedSystemIdException, self.container.container_collide, record)
+ join_identifier = ['foo']
+ properties = {'foo': 'string', 'bar': 'int'}
+ containers = Mock()
+ containers.properties = properties
+ container_fields = ['foo']
+ mapped_record = Mock(spec = Record)
+ second_container = Mock(spec = Container)
+ self.container._create_collision_record = mapped_record
+ self.container._create_collision_record.return_value = mapped_record
+ self.assertRaises(DuplicatedSystemIdException, self.container.container_collide, join_identifier, properties, containers, container_fields, mapped_record, second_container)
self.container.collision_handling = COLLISION_HANDLING_BREAKCONTAINER
- self.assertRaises(DuplicatedSystemIdException, self.container.container_collide, record)
+ self.assertRaises(DuplicatedSystemIdException, self.container.container_collide, join_identifier, properties, containers, container_fields, mapped_record, second_container)
self.container.collision_handling = COLLISION_HANDLING_BREAKJOIN
- self.container.container_collide(record)
- self.assertEquals(self.container.collisions[0], record)
+ self.container.container_collide(join_identifier, properties, containers, container_fields, mapped_record, second_container)
+ self.assertEquals(self.container.collisions[0], mapped_record)
self.container.collision_handling = COLLISION_HANDLING_BREAKNEVER
- self.container.container_collide(record)
- self.assertEquals(self.container.collisions[1], record)
+ self.container.container_collide(join_identifier, properties, containers, container_fields, mapped_record, second_container)
+ self.assertEquals(self.container.collisions[1], mapped_record)
def test_container_join_handling(self):
- record = Mock(spec = Record)
- self.assertRaises(DuplicatedJoinIdException, self.container.join_collide, record)
+ join_identifier = ['foo']
+ properties = {'foo': 'string', 'bar': 'int'}
+ containers = Mock()
+ containers.properties = properties
+ container_fields = ['foo']
+ mapped_record = Mock(spec = Record)
+ second_container = Mock(spec = Container)
+ self.container._create_collision_record = mapped_record
+ self.container._create_collision_record.return_value = mapped_record
+ self.assertRaises(DuplicatedJoinIdException, self.container.join_collide, join_identifier, properties, containers, container_fields, mapped_record, second_container)
self.container.collision_handling = COLLISION_HANDLING_BREAKJOIN
- self.assertRaises(DuplicatedJoinIdException, self.container.join_collide, record)
+ self.assertRaises(DuplicatedJoinIdException, self.container.join_collide, join_identifier, properties, containers, container_fields, mapped_record, second_container)
self.container.collision_handling = COLLISION_HANDLING_BREAKCONTAINER
- self.container.join_collide(record)
- self.assertEquals(self.container.join_collisions[0], record)
+ self.container.join_collide(join_identifier, properties, containers, container_fields, mapped_record, second_container)
+ self.assertEquals(self.container.join_collisions[0], mapped_record)
self.container.collision_handling = COLLISION_HANDLING_BREAKNEVER
- self.container.join_collide(record)
- self.assertEquals(self.container.join_collisions[1], record)
+ self.container.join_collide(join_identifier, properties, containers, container_fields, mapped_record, second_container)
+ self.assertEquals(self.container.join_collisions[1], mapped_record)
class PropertyConverterTest(unittest.TestCase):
diff --git a/hshetl/test/unit/test_loaders.py b/hshetl/test/unit/test_loaders.py
index c63cc4dccb98137dc389240199c29e72aabed686..269e7ca78c67e1ce020f65c853cb1bc1064ae9bb 100644
--- a/hshetl/test/unit/test_loaders.py
+++ b/hshetl/test/unit/test_loaders.py
@@ -118,6 +118,7 @@ class TestSqlAlchemyLoader(unittest.TestCase):
self.result.insert[0].to_dict = Mock(return_value = test.fixtures['loader_test_loading'][0])
self.result.insert[1].to_dict = Mock(return_value = test.fixtures['loader_test_loading'][1])
self.result.insert[2].to_dict = Mock(return_value = test.fixtures['loader_test_loading'][2])
+ self.etl_loader._generate_readable_query = Mock(return_value = 'Foo')
self.etl_loader._insert(self.sql_connection, self.result.insert)
self.table_mock.insert.assert_called_once_with()
self.sql_connection.execute.assert_called_once_with(insert_statement, test.fixtures['loader_test_loading'])
@@ -140,6 +141,7 @@ class TestSqlAlchemyLoader(unittest.TestCase):
self.result.update[0].to_dict = Mock(return_value = fixture[0])
self.result.update[1].to_dict = Mock(return_value = fixture[1])
self.result.update[2].to_dict = Mock(return_value = fixture[2])
+ self.etl_loader._generate_readable_query = Mock(return_value = 'Foo')
self.etl_loader._update(self.sql_connection, self.result.update)
self.table_mock.update.assert_has_calls([call(values = fixture[0]),
call(values = fixture[1]),
@@ -167,6 +169,7 @@ class TestSqlAlchemyLoader(unittest.TestCase):
self.result.delete[0].get_container_identifier = Mock(return_value = fixture[0])
self.result.delete[1].get_container_identifier = Mock(return_value = fixture[1])
self.result.delete[2].get_container_identifier = Mock(return_value = fixture[2])
+ self.etl_loader._generate_readable_query = Mock(return_value = 'Foo')
self.etl_loader._delete(self.sql_connection, self.result.delete)
self.table_mock.delete.assert_has_calls([call(), call(), call()])
where_calls = [call(table_delete_side_effect[0], self.result.delete[0]),