From cb80a966b129bc1a3527ff862ed5e95d957cb252 Mon Sep 17 00:00:00 2001
From: cwy-p8d-u1 <sven-ove.haensel@stud.hs-hannover.de>
Date: Thu, 15 Feb 2024 10:19:39 +0100
Subject: [PATCH] fix publisher and subscriber, new paho-mqtt version

---
 .../queries/cypher/driver_latest/cypher_queries.py    | 11 +++++++++--
 code/infrastructure/streaming/clients/pub/pub_cdm.py  |  2 +-
 .../streaming/clients/sub/memgraph/sub_mem.py         |  2 +-
 .../streaming/clients/sub/neo4j/sub_neo4j_cdm.py      |  2 +-
 .../streaming/clients/sub/ongdb/sub_ongdb.py          |  2 +-
 .../streaming/clients/sub/postgres/sub_pg_cdm.py      |  2 +-
 6 files changed, 14 insertions(+), 7 deletions(-)

diff --git a/code/eval/queries/cypher/driver_latest/cypher_queries.py b/code/eval/queries/cypher/driver_latest/cypher_queries.py
index 5982306..1c0c560 100644
--- a/code/eval/queries/cypher/driver_latest/cypher_queries.py
+++ b/code/eval/queries/cypher/driver_latest/cypher_queries.py
@@ -66,16 +66,23 @@ def execute_and_log_query(query_key, query):
     start_time = time.time()
     with driver.session() as session:
         result = session.run(query)
+        row_count= len(result.data())
         summary = result.consume()
         end_time = time.time()
 
     # Check if the attributes are None and set them to 0 if they are
     result_available_after = summary.result_available_after if summary.result_available_after is not None else 0
     result_consumed_after = summary.result_consumed_after if summary.result_consumed_after is not None else 0
+    row_count = row_count if row_count is not None else 0 
     execution_time = result_available_after + result_consumed_after
 
     logging.info(f"Results for Query {query_key}:")
     logging.info(f"Python executed in: {end_time - start_time}s")
+    
+    if row_count == 0:
+        logging.info(f"Number of rows not available")
+    else:
+        logging.info(f"Number of rows {row_count}")
     if execution_time == 0:
         logging.warning(f"No internal DBMS metric available")
     else:
@@ -86,8 +93,8 @@ def execute_and_log_query(query_key, query):
     # Write a header if the file is newly created or empty, else append the data
     csv_log_file.seek(0, 2)  # Move the cursor to the end of the file
     if csv_log_file.tell() == 0:  # If file is empty, write a header
-        writer.writerow(['Query Key', 'Start Time', 'End Time', 'Execution Time (ms)', 'Total Time (s)'])
-    writer.writerow([query_key, start_time, end_time, execution_time, end_time - start_time])
+        writer.writerow(['Query Key', 'Start Time', 'End Time','Fetched nodes', 'Execution Time (ms)', 'Total Time (s)'])
+    writer.writerow([query_key, start_time, end_time, row_count, execution_time, end_time - start_time])
 
 # Function to schedule and execute all queries
 def schedule_and_execute_queries():
diff --git a/code/infrastructure/streaming/clients/pub/pub_cdm.py b/code/infrastructure/streaming/clients/pub/pub_cdm.py
index 633876a..5c38e5a 100755
--- a/code/infrastructure/streaming/clients/pub/pub_cdm.py
+++ b/code/infrastructure/streaming/clients/pub/pub_cdm.py
@@ -23,7 +23,7 @@ files = [
     'ta1-cadets-e3-official-2_1.json'
     ]
 line_count = [4999999,4999999,3911712,4999999,4999999,4999999,4999999,4999999,2059063,4999999,34335661] # line_count corresponding to each file
-client = mqtt.Client("Client1")
+client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"Client1")
 topic = "neo4j"
 
 def read_moving_window_and_send_data(file_path, lines_per_window, line_count):
diff --git a/code/infrastructure/streaming/clients/sub/memgraph/sub_mem.py b/code/infrastructure/streaming/clients/sub/memgraph/sub_mem.py
index dc45de4..e0b4d41 100644
--- a/code/infrastructure/streaming/clients/sub/memgraph/sub_mem.py
+++ b/code/infrastructure/streaming/clients/sub/memgraph/sub_mem.py
@@ -6,7 +6,7 @@ import os
 
 broker_hostname=str(os.getenv('mos_host',default="localhost"))
 broker_port = int(os.getenv('mos_port',default=1883)) 
-client = mqtt.Client("Client4")
+client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"Client4")
 db_uri =str(os.getenv('mem_host',default="bolt://localhost:7687"))
 neo4j_auth=("","")
 abort_time_limit = int(os.getenv('abort_time_limit', default=99999))
diff --git a/code/infrastructure/streaming/clients/sub/neo4j/sub_neo4j_cdm.py b/code/infrastructure/streaming/clients/sub/neo4j/sub_neo4j_cdm.py
index bb04283..99b5aab 100755
--- a/code/infrastructure/streaming/clients/sub/neo4j/sub_neo4j_cdm.py
+++ b/code/infrastructure/streaming/clients/sub/neo4j/sub_neo4j_cdm.py
@@ -6,7 +6,7 @@ import os
 
 broker_hostname=str(os.getenv('mos_host',default="localhost"))
 port = int(os.getenv('mos_port',default=1883)) 
-client = mqtt.Client("Client2")
+client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"Client2")
 neo4j_uri =str(os.getenv('neo4j_host',default="bolt://localhost:7687"))
 neo4j_auth=None
 abort_time_limit = int(os.getenv('abort_time_limit', default=99999))
diff --git a/code/infrastructure/streaming/clients/sub/ongdb/sub_ongdb.py b/code/infrastructure/streaming/clients/sub/ongdb/sub_ongdb.py
index ab57d10..ade6a32 100644
--- a/code/infrastructure/streaming/clients/sub/ongdb/sub_ongdb.py
+++ b/code/infrastructure/streaming/clients/sub/ongdb/sub_ongdb.py
@@ -6,7 +6,7 @@ import os
 
 broker_hostname=str(os.getenv('mos_host',default="localhost"))
 broker_port = int(os.getenv('mos_port',default=1883)) 
-client = mqtt.Client("Client5")
+client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"Client5")
 db_uri =str(os.getenv('db_host',default="bolt://localhost:7687"))
 neo4j_auth=("","")
 abort_time_limit = int(os.getenv('abort_time_limit', default=99999))
diff --git a/code/infrastructure/streaming/clients/sub/postgres/sub_pg_cdm.py b/code/infrastructure/streaming/clients/sub/postgres/sub_pg_cdm.py
index eb02587..6298d77 100755
--- a/code/infrastructure/streaming/clients/sub/postgres/sub_pg_cdm.py
+++ b/code/infrastructure/streaming/clients/sub/postgres/sub_pg_cdm.py
@@ -13,7 +13,7 @@ broker_hostname=str(os.getenv('mos_host',default="localhost"))
 broker_port = int(os.getenv('mos_port',default=1883)) 
 file_path = str(os.getenv('path_sql_script',default='C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\streaming\\clients\\sub\\postgres\\import_node_edge.txt'))
 
-client = mqtt.Client("Client3")
+client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"Client3")
 abort_time_limit = int(os.getenv('abort_time_limit', default=99999))
 
 def flatten_obj(key, val, target):
-- 
GitLab