Skip to content
Snippets Groups Projects
Commit c6faa401 authored by schulmax's avatar schulmax
Browse files

[TASK] PID handling improved

Before this commit, there could be a pid file present
even if there was no more hshetl process running. This
was due to the pid file handling being done before the config
validation. So if hshetl stopped because of config
problems the pid file would not be deleted.
Also fixes the unittests. (copy_faker2file fails)
parent 432fa3b3
Branches
No related tags found
No related merge requests found
...@@ -163,28 +163,15 @@ class Controller(object): ...@@ -163,28 +163,15 @@ class Controller(object):
'''Resolves the name of the pid file related to this config.''' '''Resolves the name of the pid file related to this config.'''
return self.abs_config_file + '.pid' return self.abs_config_file + '.pid'
def configure(self): def execute(self):
'''Reads the yaml configuration from file and sets internal variables.''' '''Build job objects from configuration and execute them.'''
try: try:
self.check_pid_file() self.check_pid_file()
self.jobs.extend(self._configuration_parser.parse(self.abs_config_file)) self.jobs.extend(self._configuration_parser.parse(self.abs_config_file))
except IOError as ioe:
sys.exit('Can\'t read configuration file: {}'.format(str(ioe)))
except yaml.parser.ParserError as pe:
sys.exit('Error while parsing YAML configuration: {}'.format(str(pe)))
except ConfigurationException as ce:
sys.exit('Error in configuration: {}'.format(str(ce)))
except DuplicatedProcessException as dpe:
sys.exit('Error on execution: {}'.format(str(dpe)))
return self
def execute(self):
'''Build job objects from configuration and execute them.'''
if self.interactive: if self.interactive:
self.make_interactive(self.jobs) self.make_interactive(self.jobs)
if self.dry: if self.dry:
self.make_dry(self.jobs) self.make_dry(self.jobs)
try:
if self.explain: if self.explain:
self.explain_jobs() self.explain_jobs()
else: else:
...@@ -195,6 +182,14 @@ class Controller(object): ...@@ -195,6 +182,14 @@ class Controller(object):
self.jobs.execute() self.jobs.execute()
except KeyboardInterrupt: except KeyboardInterrupt:
logging.critical('The execution was manually interrupted.') logging.critical('The execution was manually interrupted.')
except DuplicatedProcessException as dpe:
logging.critical('Error on execution: {}'.format(str(dpe)))
except IOError as ioe:
logging.critical('Can\'t read configuration file: {}'.format(str(ioe)))
except yaml.parser.ParserError as pe:
logging.critical('Error while parsing YAML configuration: {}'.format(str(pe)))
except ConfigurationException as ce:
logging.critical('Error in configuration: {}'.format(str(ce)))
finally: finally:
self.shutdown() self.shutdown()
logging.info('The whole execution took {} seconds'.format(str(time.time() - self.start_time))) logging.info('The whole execution took {} seconds'.format(str(time.time() - self.start_time)))
...@@ -251,7 +246,7 @@ class Controller(object): ...@@ -251,7 +246,7 @@ class Controller(object):
shutil.copy(example_config, self.working_directory) shutil.copy(example_config, self.working_directory)
def main(): def main():
Controller().configure().execute() Controller().execute()
if __name__ == "__main__": if __name__ == "__main__":
main() main()
...@@ -86,7 +86,7 @@ class AbstractExtractor(object): ...@@ -86,7 +86,7 @@ class AbstractExtractor(object):
raise NotImplementedError raise NotImplementedError
def __unicode__(self): def __unicode__(self):
return u'<AbstractExtractor with connector: %s>' % self.connector.name return u'<AbstractExtractor with connector: %s>' % self.connector.name if self.connector is not None else 'Fake'
@yamlify @yamlify
......
...@@ -15,5 +15,5 @@ class TestCopyJob(TestBaseClass): ...@@ -15,5 +15,5 @@ class TestCopyJob(TestBaseClass):
def test_job(self): def test_job(self):
Controller().configure().execute() Controller().execute()
self.assertTrue(filecmp.cmp(self.test_working_directory + 'data/expected_person.csv', self.test_working_directory + 'person.csv')) self.assertTrue(filecmp.cmp(self.test_working_directory + 'data/expected_person.csv', self.test_working_directory + 'person.csv'))
...@@ -15,5 +15,5 @@ class TestFakerCopyJob(TestBaseClass): ...@@ -15,5 +15,5 @@ class TestFakerCopyJob(TestBaseClass):
def test_job(self): def test_job(self):
Controller().configure().execute() Controller().execute()
self.assertTrue(filecmp.cmp(self.test_working_directory + 'data/expected_fakeperson.csv', self.test_working_directory + 'fakeperson.csv')) self.assertTrue(filecmp.cmp(self.test_working_directory + 'data/expected_fakeperson.csv', self.test_working_directory + 'fakeperson.csv'))
...@@ -15,5 +15,5 @@ class TestCopyFileJob(TestBaseClass): ...@@ -15,5 +15,5 @@ class TestCopyFileJob(TestBaseClass):
def test_job(self): def test_job(self):
Controller().configure().execute() Controller().execute()
self.assertTrue(filecmp.cmp(self.test_working_directory + 'data/expected_output.csv', self.test_working_directory + 'output.csv')) self.assertTrue(filecmp.cmp(self.test_working_directory + 'data/expected_output.csv', self.test_working_directory + 'output.csv'))
...@@ -32,7 +32,7 @@ class TestFileToSQLiteWithMultipleIdentifiers(TestBaseClass): ...@@ -32,7 +32,7 @@ class TestFileToSQLiteWithMultipleIdentifiers(TestBaseClass):
self.cursor.execute('INSERT INTO user (foreign_id,inactive,first_name,name,telephone) VALUES (104, 1, \'Eagan\', \'Hutchinson\', 93829462)') self.cursor.execute('INSERT INTO user (foreign_id,inactive,first_name,name,telephone) VALUES (104, 1, \'Eagan\', \'Hutchinson\', 93829462)')
self.cursor.execute('INSERT INTO user (foreign_id,inactive,first_name,name,telephone) VALUES (108, 1, \'Rahim\', \'Wilson\', 14499323)') self.cursor.execute('INSERT INTO user (foreign_id,inactive,first_name,name,telephone) VALUES (108, 1, \'Rahim\', \'Wilson\', 14499323)')
self.conn.commit() self.conn.commit()
Controller().configure().execute() Controller().execute()
self.cursor.execute('SELECT * FROM user ORDER BY foreign_id ASC') self.cursor.execute('SELECT * FROM user ORDER BY foreign_id ASC')
for fetched_data_set in self.cursor: for fetched_data_set in self.cursor:
self.assertEqual(fetched_data_set, expected_result.pop(0)) self.assertEqual(fetched_data_set, expected_result.pop(0))
\ No newline at end of file
...@@ -20,7 +20,7 @@ class TestSqlQueryJob(TestBaseClass): ...@@ -20,7 +20,7 @@ class TestSqlQueryJob(TestBaseClass):
def test_job_creates_table_and_inserts_data_as_configured(self): def test_job_creates_table_and_inserts_data_as_configured(self):
Controller().configure().execute() Controller().execute()
self.cursor.execute('SELECT * FROM user') self.cursor.execute('SELECT * FROM user')
for result in self.cursor: for result in self.cursor:
pass pass
......
...@@ -23,7 +23,7 @@ class TestSyncFileToSQLiteSync(TestBaseClass): ...@@ -23,7 +23,7 @@ class TestSyncFileToSQLiteSync(TestBaseClass):
def test_batch_run_inserts_right_amount_of_data_sets(self): def test_batch_run_inserts_right_amount_of_data_sets(self):
Controller().configure().execute() Controller().execute()
self.cursor.execute('SELECT * FROM user') self.cursor.execute('SELECT * FROM user')
data_set_count = 0 data_set_count = 0
for _ in self.cursor: for _ in self.cursor:
...@@ -34,7 +34,7 @@ class TestSyncFileToSQLiteSync(TestBaseClass): ...@@ -34,7 +34,7 @@ class TestSyncFileToSQLiteSync(TestBaseClass):
csv_file = open(self.test_working_directory + '/data/person.csv', 'w') csv_file = open(self.test_working_directory + '/data/person.csv', 'w')
csv_file.write('p_num,name,retired,telephone\n100,Hanae Ferguson,1,67466035') csv_file.write('p_num,name,retired,telephone\n100,Hanae Ferguson,1,67466035')
csv_file.close() csv_file.close()
Controller().configure().execute() Controller().execute()
self.cursor.execute('SELECT * FROM user') self.cursor.execute('SELECT * FROM user')
self.assertEqual(self.cursor.fetchone(), (1, u'Hanae Ferguson', 1, 100, 67466035)) self.assertEqual(self.cursor.fetchone(), (1, u'Hanae Ferguson', 1, 100, 67466035))
...@@ -45,7 +45,7 @@ class TestSyncFileToSQLiteSync(TestBaseClass): ...@@ -45,7 +45,7 @@ class TestSyncFileToSQLiteSync(TestBaseClass):
self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone,id) VALUES(100, 1, \'Hanae Ferguson\', 67466035,20)') self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone,id) VALUES(100, 1, \'Hanae Ferguson\', 67466035,20)')
self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone,id) VALUES(101, 1, \'Willi Williams\', 67466035,21)') self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone,id) VALUES(101, 1, \'Willi Williams\', 67466035,21)')
self.conn.commit() self.conn.commit()
Controller().configure().execute() Controller().execute()
self.cursor.execute('SELECT * FROM user') self.cursor.execute('SELECT * FROM user')
self.assertEqual(self.cursor.fetchone(), (20, u'Hanna Ferguson', 0, 100, 12345678)) self.assertEqual(self.cursor.fetchone(), (20, u'Hanna Ferguson', 0, 100, 12345678))
self.assertEqual(self.cursor.fetchone(), (21, u'Willy Williams', 0, 101, 87654321)) self.assertEqual(self.cursor.fetchone(), (21, u'Willy Williams', 0, 101, 87654321))
...@@ -56,7 +56,7 @@ class TestSyncFileToSQLiteSync(TestBaseClass): ...@@ -56,7 +56,7 @@ class TestSyncFileToSQLiteSync(TestBaseClass):
csv_file.close() csv_file.close()
self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone,id) VALUES(100, 1, \'Hanae Ferguson\', 67466035,20)') self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone,id) VALUES(100, 1, \'Hanae Ferguson\', 67466035,20)')
self.conn.commit() self.conn.commit()
Controller().configure().execute() Controller().execute()
self.cursor.execute('SELECT * FROM user') self.cursor.execute('SELECT * FROM user')
self.assertEqual(self.cursor.fetchone(), None) self.assertEqual(self.cursor.fetchone(), None)
...@@ -73,7 +73,7 @@ class TestSyncFileToSQLiteSync(TestBaseClass): ...@@ -73,7 +73,7 @@ class TestSyncFileToSQLiteSync(TestBaseClass):
self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone) VALUES(104, 1, \'Eagan Hutchinson\', 93829462)') self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone) VALUES(104, 1, \'Eagan Hutchinson\', 93829462)')
self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone) VALUES(108, 1, \'Rahim Wilson\', 14499323)') self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone) VALUES(108, 1, \'Rahim Wilson\', 14499323)')
self.conn.commit() self.conn.commit()
Controller().configure().execute() Controller().execute()
self.cursor.execute('SELECT * FROM user ORDER BY id ASC') self.cursor.execute('SELECT * FROM user ORDER BY id ASC')
for fetched_data_set in self.cursor: for fetched_data_set in self.cursor:
self.assertEqual(fetched_data_set, expected_result.pop(0)) self.assertEqual(fetched_data_set, expected_result.pop(0))
...@@ -254,15 +254,5 @@ class TestController(unittest.TestCase): ...@@ -254,15 +254,5 @@ class TestController(unittest.TestCase):
chdir_patch.assert_called_once_with('/foo/bar/batz') chdir_patch.assert_called_once_with('/foo/bar/batz')
self.assertEqual(controller.working_directory, '/foo/bar/batz') self.assertEqual(controller.working_directory, '/foo/bar/batz')
@patch('os.chdir')
def test_configure_reads_configuration_and_fetches_objects(self, chdir_patch):
controller = Controller()
controller._configuration_parser = Mock(spec = ConfigurationParser)
job = Mock(spec = jobs.ConnectorJob)
m_jobs = [job]
controller._configuration_parser.parse.return_value = m_jobs
controller.configure()
self.assertEqual(controller.jobs[0], job)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -107,28 +107,44 @@ class ContainerTest(unittest.TestCase): ...@@ -107,28 +107,44 @@ class ContainerTest(unittest.TestCase):
self.assertEqual(self.container.map(record, True), expected) self.assertEqual(self.container.map(record, True), expected)
def test_container_collision_handling(self): def test_container_collision_handling(self):
record = Mock(spec = Record) join_identifier = ['foo']
self.assertRaises(DuplicatedSystemIdException, self.container.container_collide, record) properties = {'foo': 'string', 'bar': 'int'}
containers = Mock()
containers.properties = properties
container_fields = ['foo']
mapped_record = Mock(spec = Record)
second_container = Mock(spec = Container)
self.container._create_collision_record = mapped_record
self.container._create_collision_record.return_value = mapped_record
self.assertRaises(DuplicatedSystemIdException, self.container.container_collide, join_identifier, properties, containers, container_fields, mapped_record, second_container)
self.container.collision_handling = COLLISION_HANDLING_BREAKCONTAINER self.container.collision_handling = COLLISION_HANDLING_BREAKCONTAINER
self.assertRaises(DuplicatedSystemIdException, self.container.container_collide, record) self.assertRaises(DuplicatedSystemIdException, self.container.container_collide, join_identifier, properties, containers, container_fields, mapped_record, second_container)
self.container.collision_handling = COLLISION_HANDLING_BREAKJOIN self.container.collision_handling = COLLISION_HANDLING_BREAKJOIN
self.container.container_collide(record) self.container.container_collide(join_identifier, properties, containers, container_fields, mapped_record, second_container)
self.assertEquals(self.container.collisions[0], record) self.assertEquals(self.container.collisions[0], mapped_record)
self.container.collision_handling = COLLISION_HANDLING_BREAKNEVER self.container.collision_handling = COLLISION_HANDLING_BREAKNEVER
self.container.container_collide(record) self.container.container_collide(join_identifier, properties, containers, container_fields, mapped_record, second_container)
self.assertEquals(self.container.collisions[1], record) self.assertEquals(self.container.collisions[1], mapped_record)
def test_container_join_handling(self): def test_container_join_handling(self):
record = Mock(spec = Record) join_identifier = ['foo']
self.assertRaises(DuplicatedJoinIdException, self.container.join_collide, record) properties = {'foo': 'string', 'bar': 'int'}
containers = Mock()
containers.properties = properties
container_fields = ['foo']
mapped_record = Mock(spec = Record)
second_container = Mock(spec = Container)
self.container._create_collision_record = mapped_record
self.container._create_collision_record.return_value = mapped_record
self.assertRaises(DuplicatedJoinIdException, self.container.join_collide, join_identifier, properties, containers, container_fields, mapped_record, second_container)
self.container.collision_handling = COLLISION_HANDLING_BREAKJOIN self.container.collision_handling = COLLISION_HANDLING_BREAKJOIN
self.assertRaises(DuplicatedJoinIdException, self.container.join_collide, record) self.assertRaises(DuplicatedJoinIdException, self.container.join_collide, join_identifier, properties, containers, container_fields, mapped_record, second_container)
self.container.collision_handling = COLLISION_HANDLING_BREAKCONTAINER self.container.collision_handling = COLLISION_HANDLING_BREAKCONTAINER
self.container.join_collide(record) self.container.join_collide(join_identifier, properties, containers, container_fields, mapped_record, second_container)
self.assertEquals(self.container.join_collisions[0], record) self.assertEquals(self.container.join_collisions[0], mapped_record)
self.container.collision_handling = COLLISION_HANDLING_BREAKNEVER self.container.collision_handling = COLLISION_HANDLING_BREAKNEVER
self.container.join_collide(record) self.container.join_collide(join_identifier, properties, containers, container_fields, mapped_record, second_container)
self.assertEquals(self.container.join_collisions[1], record) self.assertEquals(self.container.join_collisions[1], mapped_record)
class PropertyConverterTest(unittest.TestCase): class PropertyConverterTest(unittest.TestCase):
......
...@@ -118,6 +118,7 @@ class TestSqlAlchemyLoader(unittest.TestCase): ...@@ -118,6 +118,7 @@ class TestSqlAlchemyLoader(unittest.TestCase):
self.result.insert[0].to_dict = Mock(return_value = test.fixtures['loader_test_loading'][0]) self.result.insert[0].to_dict = Mock(return_value = test.fixtures['loader_test_loading'][0])
self.result.insert[1].to_dict = Mock(return_value = test.fixtures['loader_test_loading'][1]) self.result.insert[1].to_dict = Mock(return_value = test.fixtures['loader_test_loading'][1])
self.result.insert[2].to_dict = Mock(return_value = test.fixtures['loader_test_loading'][2]) self.result.insert[2].to_dict = Mock(return_value = test.fixtures['loader_test_loading'][2])
self.etl_loader._generate_readable_query = Mock(return_value = 'Foo')
self.etl_loader._insert(self.sql_connection, self.result.insert) self.etl_loader._insert(self.sql_connection, self.result.insert)
self.table_mock.insert.assert_called_once_with() self.table_mock.insert.assert_called_once_with()
self.sql_connection.execute.assert_called_once_with(insert_statement, test.fixtures['loader_test_loading']) self.sql_connection.execute.assert_called_once_with(insert_statement, test.fixtures['loader_test_loading'])
...@@ -140,6 +141,7 @@ class TestSqlAlchemyLoader(unittest.TestCase): ...@@ -140,6 +141,7 @@ class TestSqlAlchemyLoader(unittest.TestCase):
self.result.update[0].to_dict = Mock(return_value = fixture[0]) self.result.update[0].to_dict = Mock(return_value = fixture[0])
self.result.update[1].to_dict = Mock(return_value = fixture[1]) self.result.update[1].to_dict = Mock(return_value = fixture[1])
self.result.update[2].to_dict = Mock(return_value = fixture[2]) self.result.update[2].to_dict = Mock(return_value = fixture[2])
self.etl_loader._generate_readable_query = Mock(return_value = 'Foo')
self.etl_loader._update(self.sql_connection, self.result.update) self.etl_loader._update(self.sql_connection, self.result.update)
self.table_mock.update.assert_has_calls([call(values = fixture[0]), self.table_mock.update.assert_has_calls([call(values = fixture[0]),
call(values = fixture[1]), call(values = fixture[1]),
...@@ -167,6 +169,7 @@ class TestSqlAlchemyLoader(unittest.TestCase): ...@@ -167,6 +169,7 @@ class TestSqlAlchemyLoader(unittest.TestCase):
self.result.delete[0].get_container_identifier = Mock(return_value = fixture[0]) self.result.delete[0].get_container_identifier = Mock(return_value = fixture[0])
self.result.delete[1].get_container_identifier = Mock(return_value = fixture[1]) self.result.delete[1].get_container_identifier = Mock(return_value = fixture[1])
self.result.delete[2].get_container_identifier = Mock(return_value = fixture[2]) self.result.delete[2].get_container_identifier = Mock(return_value = fixture[2])
self.etl_loader._generate_readable_query = Mock(return_value = 'Foo')
self.etl_loader._delete(self.sql_connection, self.result.delete) self.etl_loader._delete(self.sql_connection, self.result.delete)
self.table_mock.delete.assert_has_calls([call(), call(), call()]) self.table_mock.delete.assert_has_calls([call(), call(), call()])
where_calls = [call(table_delete_side_effect[0], self.result.delete[0]), where_calls = [call(table_delete_side_effect[0], self.result.delete[0]),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment