Skip to content
Snippets Groups Projects
Commit 89752c6c authored by Dennis Ahrens's avatar Dennis Ahrens
Browse files

[BUGFIX] SQL loader works correctly.

Insert added empty record if there was no data supplied and update and delete
had a bug related to the constraits creation.
parent 9f4eeb4c
No related branches found
No related tags found
No related merge requests found
...@@ -217,6 +217,7 @@ class SqlAlchemyLoader(AbstractLoader): ...@@ -217,6 +217,7 @@ class SqlAlchemyLoader(AbstractLoader):
def _insert(self, connection, data): def _insert(self, connection, data):
'''Creates a sql insert statement using sqlalchemy and executes it on the connection.''' '''Creates a sql insert statement using sqlalchemy and executes it on the connection.'''
if len(data) == 0: return
values = [] values = []
for record in data: for record in data:
values.append(record.to_dict(remove_system_identifiers = False)) values.append(record.to_dict(remove_system_identifiers = False))
...@@ -228,7 +229,7 @@ class SqlAlchemyLoader(AbstractLoader): ...@@ -228,7 +229,7 @@ class SqlAlchemyLoader(AbstractLoader):
'''Creates a sql update statement using sqlalchemy and executes it on the connection.''' '''Creates a sql update statement using sqlalchemy and executes it on the connection.'''
for record in data: for record in data:
statement = self.table.update(values = record.to_dict()) statement = self.table.update(values = record.to_dict())
self._add_where_constraints(statement, record) statement = self._add_where_constraints(statement, record)
logging.debug('executing sql update query: ' + str(statement)) logging.debug('executing sql update query: ' + str(statement))
connection.execute(statement) connection.execute(statement)
...@@ -237,7 +238,7 @@ class SqlAlchemyLoader(AbstractLoader): ...@@ -237,7 +238,7 @@ class SqlAlchemyLoader(AbstractLoader):
'''Creates a sql delete statement using sqlalchemy and executes it on the connection.''' '''Creates a sql delete statement using sqlalchemy and executes it on the connection.'''
for record in data: for record in data:
statement = self.table.delete() statement = self.table.delete()
self._add_where_constraints(statement, record) statement = self._add_where_constraints(statement, record)
logging.debug('executing sql delete query: ' + str(statement)) logging.debug('executing sql delete query: ' + str(statement))
connection.execute(statement) connection.execute(statement)
...@@ -256,6 +257,7 @@ class SqlAlchemyLoader(AbstractLoader): ...@@ -256,6 +257,7 @@ class SqlAlchemyLoader(AbstractLoader):
statement = statement.where(self.table.c.get(column) == value) statement = statement.where(self.table.c.get(column) == value)
else: else:
raise ConfigurationException('Value for identifier \'' + str(column) + '\' empty.') raise ConfigurationException('Value for identifier \'' + str(column) + '\' empty.')
return statement
@yamlify @yamlify
......
...@@ -11,9 +11,10 @@ class TestSyncFileToSQLiteSync(TestBaseClass): ...@@ -11,9 +11,10 @@ class TestSyncFileToSQLiteSync(TestBaseClass):
super(TestSyncFileToSQLiteSync, self).setUp() super(TestSyncFileToSQLiteSync, self).setUp()
self.conn = sqlite3.connect(self.test_working_directory + 'person.sqlite3') self.conn = sqlite3.connect(self.test_working_directory + 'person.sqlite3')
self.cursor = self.conn.cursor() self.cursor = self.conn.cursor()
self.cursor.execute('DROP TABLE IF EXISTS user;')
self.cursor.execute('CREATE TABLE user (id INTEGER PRIMARY KEY, name VARCHAR(50), inactive BOOL, foreign_id INT, telephone INT);') self.cursor.execute('CREATE TABLE user (id INTEGER PRIMARY KEY, name VARCHAR(50), inactive BOOL, foreign_id INT, telephone INT);')
self.conn.commit() self.conn.commit()
sys.argv = ['hshetl', '-d', self.test_working_directory] sys.argv = ['hshetl', '-v', '-d', self.test_working_directory]
def tearDown(self): def tearDown(self):
super(TestSyncFileToSQLiteSync, self).tearDown() super(TestSyncFileToSQLiteSync, self).tearDown()
...@@ -38,13 +39,15 @@ class TestSyncFileToSQLiteSync(TestBaseClass): ...@@ -38,13 +39,15 @@ class TestSyncFileToSQLiteSync(TestBaseClass):
def test_sync_updates_outdated_data_set(self): def test_sync_updates_outdated_data_set(self):
csv_file = open(self.test_working_directory + 'data/person.csv', 'w') csv_file = open(self.test_working_directory + 'data/person.csv', 'w')
csv_file.write('p_num,name,retired,telephone\n100,Hanna Ferguson,0,12345678') csv_file.write('p_num,name,retired,telephone\n100,Hanna Ferguson,0,12345678\n101,Willy Williams,0,87654321')
csv_file.close() csv_file.close()
self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone,id) VALUES(100, 1, \'Hanae Ferguson\', 67466035,20)') self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone,id) VALUES(100, 1, \'Hanae Ferguson\', 67466035,20)')
self.cursor.execute('INSERT INTO user (foreign_id,inactive,name,telephone,id) VALUES(101, 1, \'Willi Williams\', 67466035,21)')
self.conn.commit() self.conn.commit()
Controller().configure().execute() Controller().configure().execute()
self.cursor.execute('SELECT * FROM user') self.cursor.execute('SELECT * FROM user')
self.assertEqual(self.cursor.fetchone(), (20, u'Hanna Ferguson', 0, 100, 12345678)) self.assertEqual(self.cursor.fetchone(), (20, u'Hanna Ferguson', 0, 100, 12345678))
self.assertEqual(self.cursor.fetchone(), (21, u'Willy Williams', 0, 101, 87654321))
def test_sync_deletes_outdated_data_set(self): def test_sync_deletes_outdated_data_set(self):
csv_file = open(self.test_working_directory + 'data/person.csv', 'w') csv_file = open(self.test_working_directory + 'data/person.csv', 'w')
...@@ -54,7 +57,7 @@ class TestSyncFileToSQLiteSync(TestBaseClass): ...@@ -54,7 +57,7 @@ class TestSyncFileToSQLiteSync(TestBaseClass):
self.conn.commit() self.conn.commit()
Controller().configure().execute() Controller().configure().execute()
self.cursor.execute('SELECT * FROM user') self.cursor.execute('SELECT * FROM user')
self.assertEqual(self.cursor.fetchone(), None, 'A data set that does not exist in the source system was not deleted.') self.assertEqual(self.cursor.fetchone(), None)
def test_sync_handles_insert_update_and_delete_simultaneously(self): def test_sync_handles_insert_update_and_delete_simultaneously(self):
expected_result = [(1, u'Hanae Ferguson', 1, 100, 67466035), expected_result = [(1, u'Hanae Ferguson', 1, 100, 67466035),
......
...@@ -131,7 +131,7 @@ class TestSqlAlchemyLoader(unittest.TestCase): ...@@ -131,7 +131,7 @@ class TestSqlAlchemyLoader(unittest.TestCase):
Mock(spec = sqlalchemy.sql.expression.Update), Mock(spec = sqlalchemy.sql.expression.Update),
Mock(spec = sqlalchemy.sql.expression.Update)] Mock(spec = sqlalchemy.sql.expression.Update)]
self.table_mock.update = Mock(side_effect = table_update_side_effect) self.table_mock.update = Mock(side_effect = table_update_side_effect)
self.etl_loader._add_where_constraints = Mock() self.etl_loader._add_where_constraints = Mock(side_effect = table_update_side_effect)
fixture = copy.deepcopy(test.fixtures['loader_test_loading']) fixture = copy.deepcopy(test.fixtures['loader_test_loading'])
del fixture[0]['Name'] del fixture[0]['Name']
del fixture[1]['Name'] del fixture[1]['Name']
...@@ -161,7 +161,7 @@ class TestSqlAlchemyLoader(unittest.TestCase): ...@@ -161,7 +161,7 @@ class TestSqlAlchemyLoader(unittest.TestCase):
Mock(spec = sqlalchemy.sql.expression.Delete), Mock(spec = sqlalchemy.sql.expression.Delete),
Mock(spec = sqlalchemy.sql.expression.Delete)] Mock(spec = sqlalchemy.sql.expression.Delete)]
self.table_mock.delete = Mock(side_effect = table_delete_side_effect) self.table_mock.delete = Mock(side_effect = table_delete_side_effect)
self.etl_loader._add_where_constraints = Mock() self.etl_loader._add_where_constraints = Mock(side_effect = table_delete_side_effect)
fixture = copy.deepcopy(test.fixtures['loader_test_loading']) fixture = copy.deepcopy(test.fixtures['loader_test_loading'])
self.result.delete[0].get_container_identifier = Mock(return_value = fixture[0]) self.result.delete[0].get_container_identifier = Mock(return_value = fixture[0])
self.result.delete[1].get_container_identifier = Mock(return_value = fixture[1]) self.result.delete[1].get_container_identifier = Mock(return_value = fixture[1])
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment