From c6faa4014d82f995e45c94fd3f5a41730fa5f0a9 Mon Sep 17 00:00:00 2001
From: Maxi Schulz <maximilian.schulz@hs-hannover.de>
Date: Thu, 9 Oct 2014 16:20:58 +0200
Subject: [PATCH] [TASK] PID handling improved

Before this commit, there could be a pid file present
even if there was no more hshetl process running. This
was due to the pid file handling being done before the config
validation. So if hshetl stopped because of config
problems the pid file would not be deleted.
Also fixes the unittests. (copy_faker2file fails)
---
 hshetl/cli.py                                 | 35 +++++++--------
 hshetl/extractors.py                          |  2 +-
 hshetl/test/functional/test_copy.py           |  2 +-
 .../test/functional/test_copy_faker2file.py   |  2 +-
 hshetl/test/functional/test_copy_file2file.py |  2 +-
 ...eetl_file2sqliteWithMultipleIdentifiers.py |  2 +-
 .../functional/test_query_sqlquery2sqlite.py  |  2 +-
 .../test/functional/test_sync_file2sqlite.py  | 10 ++---
 hshetl/test/unit/test_cli.py                  | 10 -----
 hshetl/test/unit/test_entities.py             | 44 +++++++++++++------
 hshetl/test/unit/test_loaders.py              |  3 ++
 11 files changed, 59 insertions(+), 55 deletions(-)

diff --git a/hshetl/cli.py b/hshetl/cli.py
index 3e5bb02..31ecf18 100644
--- a/hshetl/cli.py
+++ b/hshetl/cli.py
@@ -163,28 +163,15 @@ class Controller(object):
         '''Resolves the name of the pid file related to this config.'''
         return self.abs_config_file + '.pid'
 
-    def configure(self):
-        '''Reads the yaml configuration from file and sets internal variables.'''
-        try:
-            self.check_pid_file()
-            self.jobs.extend(self._configuration_parser.parse(self.abs_config_file))
-        except IOError as ioe:
-            sys.exit('Can\'t read configuration file: {}'.format(str(ioe)))
-        except yaml.parser.ParserError as pe:
-            sys.exit('Error while parsing YAML configuration: {}'.format(str(pe)))
-        except ConfigurationException as ce:
-            sys.exit('Error in configuration: {}'.format(str(ce)))
-        except DuplicatedProcessException as dpe:
-            sys.exit('Error on execution: {}'.format(str(dpe)))
-        return self
-
     def execute(self):
         '''Build job objects from configuration and execute them.'''
-        if self.interactive:
-            self.make_interactive(self.jobs)
-        if self.dry:
-            self.make_dry(self.jobs)
         try:
+            self.check_pid_file()
+            self.jobs.extend(self._configuration_parser.parse(self.abs_config_file))
+            if self.interactive:
+                self.make_interactive(self.jobs)
+            if self.dry:
+                self.make_dry(self.jobs)
             if self.explain:
                 self.explain_jobs()
             else:
@@ -195,6 +182,14 @@ class Controller(object):
                     self.jobs.execute()
         except KeyboardInterrupt:
             logging.critical('The execution was manually interrupted.')
+        except DuplicatedProcessException as dpe:
+            logging.critical('Error on execution: {}'.format(str(dpe)))
+        except IOError as ioe:
+            logging.critical('Can\'t read configuration file: {}'.format(str(ioe)))
+        except yaml.parser.ParserError as pe:
+            logging.critical('Error while parsing YAML configuration: {}'.format(str(pe)))
+        except ConfigurationException as ce:
+            logging.critical('Error in configuration: {}'.format(str(ce)))
         finally:
             self.shutdown()
         logging.info('The whole execution took {} seconds'.format(str(time.time() - self.start_time)))
@@ -251,7 +246,7 @@ class Controller(object):
         shutil.copy(example_config, self.working_directory)
 
 def main():
-    Controller().configure().execute()
+    Controller().execute()
 
 if __name__ == "__main__":
     main()
diff --git a/hshetl/extractors.py b/hshetl/extractors.py
index a5bcdc0..16a208f 100644
--- a/hshetl/extractors.py
+++ b/hshetl/extractors.py
@@ -86,7 +86,7 @@ class AbstractExtractor(object):
         raise NotImplementedError
 
     def __unicode__(self):
-        return u'<AbstractExtractor with connector: %s>' % self.connector.name
+        return u'<AbstractExtractor with connector: %s>' % self.connector.name if self.connector is not None else 'Fake'
 
 
 @yamlify
diff --git a/hshetl/test/functional/test_copy.py b/hshetl/test/functional/test_copy.py
index b922c52..9643644 100644
--- a/hshetl/test/functional/test_copy.py
+++ b/hshetl/test/functional/test_copy.py
@@ -15,5 +15,5 @@ class TestCopyJob(TestBaseClass):
 
 
     def test_job(self):
-        Controller().configure().execute()
+        Controller().execute()
         self.assertTrue(filecmp.cmp(self.test_working_directory + 'data/expected_person.csv', self.test_working_directory + 'person.csv'))
diff --git a/hshetl/test/functional/test_copy_faker2file.py b/hshetl/test/functional/test_copy_faker2file.py
index 1ee80ec..b4ff2f5 100644
--- a/hshetl/test/functional/test_copy_faker2file.py
+++ b/hshetl/test/functional/test_copy_faker2file.py
@@ -15,5 +15,5 @@ class TestFakerCopyJob(TestBaseClass):
 
 
     def test_job(self):
-        Controller().configure().execute()
+        Controller().execute()
         self.assertTrue(filecmp.cmp(self.test_working_directory + 'data/expected_fakeperson.csv', self.test_working_directory + 'fakeperson.csv'))
diff --git a/hshetl/test/functional/test_copy_file2file.py b/hshetl/test/functional/test_copy_file2file.py
index ae7fdaf..ccc0236 100644
--- a/hshetl/test/functional/test_copy_file2file.py
+++ b/hshetl/test/functional/test_copy_file2file.py
@@ -15,5 +15,5 @@ class TestCopyFileJob(TestBaseClass):
 
 
     def test_job(self):
-        Controller().configure().execute()
+        Controller().execute()
         self.assertTrue(filecmp.cmp(self.test_working_directory + 'data/expected_output.csv', self.test_working_directory + 'output.csv'))
diff --git a/hshetl/test/functional/test_eetl_file2sqliteWithMultipleIdentifiers.py b/hshetl/test/functional/test_eetl_file2sqliteWithMultipleIdentifiers.py
index 929c99e..455f7f1 100644
--- a/hshetl/test/functional/test_eetl_file2sqliteWithMultipleIdentifiers.py
+++ b/hshetl/test/functional/test_eetl_file2sqliteWithMultipleIdentifiers.py
@@ -32,7 +32,7 @@ class TestFileToSQLiteWithMultipleIdentifiers(TestBaseClass):
         self.cursor.execute('INSERT INTO user (foreign_id,inactive,first_name,name,telephone) VALUES (104, 1, \'Eagan\', \'Hutchinson\', 93829462)')
         self.cursor.execute('INSERT INTO user (foreign_id,inactive,first_name,name,telephone) VALUES (108, 1, \'Rahim\', \'Wilson\', 14499323)')
         self.conn.commit()
-        Controller().configure().execute()
+        Controller().execute()
         self.cursor.execute('SELECT * FROM user ORDER BY foreign_id ASC')
         for fetched_data_set in self.cursor:
             self.assertEqual(fetched_data_set, expected_result.pop(0))
\ No newline at end of file
diff --git a/hshetl/test/functional/test_query_sqlquery2sqlite.py b/hshetl/test/functional/test_query_sqlquery2sqlite.py
index 312c7ab..0bcf0f9 100644
--- a/hshetl/test/functional/test_query_sqlquery2sqlite.py
+++ b/hshetl/test/functional/test_query_sqlquery2sqlite.py
@@ -20,7 +20,7 @@ class TestSqlQueryJob(TestBaseClass):
 
 
     def test_job_creates_table_and_inserts_data_as_configured(self):
-        Controller().configure().execute()
+        Controller().execute()
         self.cursor.execute('SELECT * FROM user')
         for result in self.cursor:
             pass
diff --git a/hshetl/test/functional/test_sync_file2sqlite.py b/hshetl/test/functional/test_sync_file2sqlite.py
index 8303de2..494cc15 100644
--- a/hshetl/test/functional/test_sync_file2sqlite.py
+++ b/hshetl/test/functional/test_sync_file2sqlite.py
@@ -23,7 +23,7 @@ class TestSyncFileToSQLiteSync(TestBaseClass):
 
 
     def test_batch_run_inserts_right_amount_of_data_sets(self):
-        Controller().configure().execute()
+        Controller().execute()
         self.cursor.execute('SELECT * FROM user')
         data_set_count = 0
         for _ in self.cursor:
@@ -34,7 +34,7 @@ class TestSyncFileToSQLiteSync(TestBaseClass):
         csv_file = open(self.test_working_directory + '/data/person.csv', 'w')
         csv_file.write('p_num,name,retired,telephone\n100,Hanae Ferguson,1,67466035')
         csv_file.close()
-        Controller().configure().execute()
+        Controller().execute()
         self.cursor.execute('SELECT * FROM user')
         self.assertEqual(self.cursor.fetchone(), (1, u'Hanae Ferguson', 1, 100, 67466035))
 
@@ -45,7 +45,7 @@ class TestSyncFileToSQLiteSync(TestBaseClass):
         self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone,id) VALUES(100, 1, \'Hanae Ferguson\', 67466035,20)')
         self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone,id) VALUES(101, 1, \'Willi Williams\', 67466035,21)')
         self.conn.commit()
-        Controller().configure().execute()
+        Controller().execute()
         self.cursor.execute('SELECT * FROM user')
         self.assertEqual(self.cursor.fetchone(), (20, u'Hanna Ferguson', 0, 100, 12345678))
         self.assertEqual(self.cursor.fetchone(), (21, u'Willy Williams', 0, 101, 87654321))
@@ -56,7 +56,7 @@ class TestSyncFileToSQLiteSync(TestBaseClass):
         csv_file.close()
         self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone,id) VALUES(100, 1, \'Hanae Ferguson\', 67466035,20)')
         self.conn.commit()
-        Controller().configure().execute()
+        Controller().execute()
         self.cursor.execute('SELECT * FROM user')
         self.assertEqual(self.cursor.fetchone(), None)
 
@@ -73,7 +73,7 @@ class TestSyncFileToSQLiteSync(TestBaseClass):
         self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone) VALUES(104, 1, \'Eagan Hutchinson\', 93829462)')
         self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone) VALUES(108, 1, \'Rahim Wilson\', 14499323)')
         self.conn.commit()
-        Controller().configure().execute()
+        Controller().execute()
         self.cursor.execute('SELECT * FROM user ORDER BY id ASC')
         for fetched_data_set in self.cursor:
             self.assertEqual(fetched_data_set, expected_result.pop(0))
diff --git a/hshetl/test/unit/test_cli.py b/hshetl/test/unit/test_cli.py
index 705a38e..9b64166 100644
--- a/hshetl/test/unit/test_cli.py
+++ b/hshetl/test/unit/test_cli.py
@@ -254,15 +254,5 @@ class TestController(unittest.TestCase):
         chdir_patch.assert_called_once_with('/foo/bar/batz')
         self.assertEqual(controller.working_directory, '/foo/bar/batz')
 
-    @patch('os.chdir')
-    def test_configure_reads_configuration_and_fetches_objects(self, chdir_patch):
-        controller = Controller()
-        controller._configuration_parser = Mock(spec = ConfigurationParser)
-        job = Mock(spec = jobs.ConnectorJob)
-        m_jobs = [job]
-        controller._configuration_parser.parse.return_value = m_jobs
-        controller.configure()
-        self.assertEqual(controller.jobs[0], job)
-
 if __name__ == "__main__":
     unittest.main()
diff --git a/hshetl/test/unit/test_entities.py b/hshetl/test/unit/test_entities.py
index 4687bd1..da22ca7 100644
--- a/hshetl/test/unit/test_entities.py
+++ b/hshetl/test/unit/test_entities.py
@@ -107,28 +107,44 @@ class ContainerTest(unittest.TestCase):
         self.assertEqual(self.container.map(record, True), expected)
 
     def test_container_collision_handling(self):
-        record = Mock(spec = Record)
-        self.assertRaises(DuplicatedSystemIdException, self.container.container_collide, record)
+        join_identifier = ['foo']
+        properties = {'foo': 'string', 'bar': 'int'}
+        containers = Mock()
+        containers.properties = properties
+        container_fields = ['foo']
+        mapped_record = Mock(spec = Record)
+        second_container = Mock(spec = Container)
+        self.container._create_collision_record = mapped_record
+        self.container._create_collision_record.return_value = mapped_record
+        self.assertRaises(DuplicatedSystemIdException, self.container.container_collide, join_identifier, properties, containers, container_fields, mapped_record, second_container)
         self.container.collision_handling = COLLISION_HANDLING_BREAKCONTAINER
-        self.assertRaises(DuplicatedSystemIdException, self.container.container_collide, record)
+        self.assertRaises(DuplicatedSystemIdException, self.container.container_collide, join_identifier, properties, containers, container_fields, mapped_record, second_container)
         self.container.collision_handling = COLLISION_HANDLING_BREAKJOIN
-        self.container.container_collide(record)
-        self.assertEquals(self.container.collisions[0], record)
+        self.container.container_collide(join_identifier, properties, containers, container_fields, mapped_record, second_container)
+        self.assertEquals(self.container.collisions[0], mapped_record)
         self.container.collision_handling = COLLISION_HANDLING_BREAKNEVER
-        self.container.container_collide(record)
-        self.assertEquals(self.container.collisions[1], record)
+        self.container.container_collide(join_identifier, properties, containers, container_fields, mapped_record, second_container)
+        self.assertEquals(self.container.collisions[1], mapped_record)
 
     def test_container_join_handling(self):
-        record = Mock(spec = Record)
-        self.assertRaises(DuplicatedJoinIdException, self.container.join_collide, record)
+        join_identifier = ['foo']
+        properties = {'foo': 'string', 'bar': 'int'}
+        containers = Mock()
+        containers.properties = properties
+        container_fields = ['foo']
+        mapped_record = Mock(spec = Record)
+        second_container = Mock(spec = Container)
+        self.container._create_collision_record = mapped_record
+        self.container._create_collision_record.return_value = mapped_record
+        self.assertRaises(DuplicatedJoinIdException, self.container.join_collide, join_identifier, properties, containers, container_fields, mapped_record, second_container)
         self.container.collision_handling = COLLISION_HANDLING_BREAKJOIN
-        self.assertRaises(DuplicatedJoinIdException, self.container.join_collide, record)
+        self.assertRaises(DuplicatedJoinIdException, self.container.join_collide, join_identifier, properties, containers, container_fields, mapped_record, second_container)
         self.container.collision_handling = COLLISION_HANDLING_BREAKCONTAINER
-        self.container.join_collide(record)
-        self.assertEquals(self.container.join_collisions[0], record)
+        self.container.join_collide(join_identifier, properties, containers, container_fields, mapped_record, second_container)
+        self.assertEquals(self.container.join_collisions[0], mapped_record)
         self.container.collision_handling = COLLISION_HANDLING_BREAKNEVER
-        self.container.join_collide(record)
-        self.assertEquals(self.container.join_collisions[1], record)
+        self.container.join_collide(join_identifier, properties, containers, container_fields, mapped_record, second_container)
+        self.assertEquals(self.container.join_collisions[1], mapped_record)
 
 
 class PropertyConverterTest(unittest.TestCase):
diff --git a/hshetl/test/unit/test_loaders.py b/hshetl/test/unit/test_loaders.py
index c63cc4d..269e7ca 100644
--- a/hshetl/test/unit/test_loaders.py
+++ b/hshetl/test/unit/test_loaders.py
@@ -118,6 +118,7 @@ class TestSqlAlchemyLoader(unittest.TestCase):
         self.result.insert[0].to_dict = Mock(return_value = test.fixtures['loader_test_loading'][0])
         self.result.insert[1].to_dict = Mock(return_value = test.fixtures['loader_test_loading'][1])
         self.result.insert[2].to_dict = Mock(return_value = test.fixtures['loader_test_loading'][2])
+        self.etl_loader._generate_readable_query = Mock(return_value = 'Foo')
         self.etl_loader._insert(self.sql_connection, self.result.insert)
         self.table_mock.insert.assert_called_once_with()
         self.sql_connection.execute.assert_called_once_with(insert_statement, test.fixtures['loader_test_loading'])
@@ -140,6 +141,7 @@ class TestSqlAlchemyLoader(unittest.TestCase):
         self.result.update[0].to_dict = Mock(return_value = fixture[0])
         self.result.update[1].to_dict = Mock(return_value = fixture[1])
         self.result.update[2].to_dict = Mock(return_value = fixture[2])
+        self.etl_loader._generate_readable_query = Mock(return_value = 'Foo')
         self.etl_loader._update(self.sql_connection, self.result.update)
         self.table_mock.update.assert_has_calls([call(values = fixture[0]),
                                                  call(values = fixture[1]),
@@ -167,6 +169,7 @@ class TestSqlAlchemyLoader(unittest.TestCase):
         self.result.delete[0].get_container_identifier = Mock(return_value = fixture[0])
         self.result.delete[1].get_container_identifier = Mock(return_value = fixture[1])
         self.result.delete[2].get_container_identifier = Mock(return_value = fixture[2])
+        self.etl_loader._generate_readable_query = Mock(return_value = 'Foo')
         self.etl_loader._delete(self.sql_connection, self.result.delete)
         self.table_mock.delete.assert_has_calls([call(), call(), call()])
         where_calls = [call(table_delete_side_effect[0], self.result.delete[0]),
-- 
GitLab