diff --git a/code/eval/queries/cypher/driver_latest/cypher_queries.py b/code/eval/queries/cypher/driver_latest/cypher_queries.py index 5982306ecf9c542e4fd36256e5ef817c69bf5885..1c0c56015181e2382e26c6e3ab65c0f275fef610 100644 --- a/code/eval/queries/cypher/driver_latest/cypher_queries.py +++ b/code/eval/queries/cypher/driver_latest/cypher_queries.py @@ -66,16 +66,23 @@ def execute_and_log_query(query_key, query): start_time = time.time() with driver.session() as session: result = session.run(query) + row_count= len(result.data()) summary = result.consume() end_time = time.time() # Check if the attributes are None and set them to 0 if they are result_available_after = summary.result_available_after if summary.result_available_after is not None else 0 result_consumed_after = summary.result_consumed_after if summary.result_consumed_after is not None else 0 + row_count = row_count if row_count is not None else 0 execution_time = result_available_after + result_consumed_after logging.info(f"Results for Query {query_key}:") logging.info(f"Python executed in: {end_time - start_time}s") + + if row_count == 0: + logging.info(f"Number of rows not available") + else: + logging.info(f"Number of rows {row_count}") if execution_time == 0: logging.warning(f"No internal DBMS metric available") else: @@ -86,8 +93,8 @@ def execute_and_log_query(query_key, query): # Write a header if the file is newly created or empty, else append the data csv_log_file.seek(0, 2) # Move the cursor to the end of the file if csv_log_file.tell() == 0: # If file is empty, write a header - writer.writerow(['Query Key', 'Start Time', 'End Time', 'Execution Time (ms)', 'Total Time (s)']) - writer.writerow([query_key, start_time, end_time, execution_time, end_time - start_time]) + writer.writerow(['Query Key', 'Start Time', 'End Time','Fetched nodes', 'Execution Time (ms)', 'Total Time (s)']) + writer.writerow([query_key, start_time, end_time, row_count, execution_time, end_time - start_time]) # Function to schedule and execute all queries def schedule_and_execute_queries(): diff --git a/code/infrastructure/streaming/clients/pub/pub_cdm.py b/code/infrastructure/streaming/clients/pub/pub_cdm.py index 633876a7120edcbf2ca4075f59ef068eeeaeb662..5c38e5a51cace74c166a2eb83bd9fd9a0fd9b8ac 100755 --- a/code/infrastructure/streaming/clients/pub/pub_cdm.py +++ b/code/infrastructure/streaming/clients/pub/pub_cdm.py @@ -23,7 +23,7 @@ files = [ 'ta1-cadets-e3-official-2_1.json' ] line_count = [4999999,4999999,3911712,4999999,4999999,4999999,4999999,4999999,2059063,4999999,34335661] # line_count corresponding to each file -client = mqtt.Client("Client1") +client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"Client1") topic = "neo4j" def read_moving_window_and_send_data(file_path, lines_per_window, line_count): diff --git a/code/infrastructure/streaming/clients/sub/memgraph/sub_mem.py b/code/infrastructure/streaming/clients/sub/memgraph/sub_mem.py index dc45de41428d9b6d7b0f2806e8d85bf2873f6bcd..e0b4d416fa7dda5f4f437c6134516e42a1fb7853 100644 --- a/code/infrastructure/streaming/clients/sub/memgraph/sub_mem.py +++ b/code/infrastructure/streaming/clients/sub/memgraph/sub_mem.py @@ -6,7 +6,7 @@ import os broker_hostname=str(os.getenv('mos_host',default="localhost")) broker_port = int(os.getenv('mos_port',default=1883)) -client = mqtt.Client("Client4") +client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"Client4") db_uri =str(os.getenv('mem_host',default="bolt://localhost:7687")) neo4j_auth=("","") abort_time_limit = int(os.getenv('abort_time_limit', default=99999)) diff --git a/code/infrastructure/streaming/clients/sub/neo4j/sub_neo4j_cdm.py b/code/infrastructure/streaming/clients/sub/neo4j/sub_neo4j_cdm.py index bb04283df6a777a93ce314bcb2c6e977140a0f74..99b5aabb7a05e3ac0fbbcba13af32f434402d64d 100755 --- a/code/infrastructure/streaming/clients/sub/neo4j/sub_neo4j_cdm.py +++ b/code/infrastructure/streaming/clients/sub/neo4j/sub_neo4j_cdm.py @@ -6,7 +6,7 @@ import os broker_hostname=str(os.getenv('mos_host',default="localhost")) port = int(os.getenv('mos_port',default=1883)) -client = mqtt.Client("Client2") +client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"Client2") neo4j_uri =str(os.getenv('neo4j_host',default="bolt://localhost:7687")) neo4j_auth=None abort_time_limit = int(os.getenv('abort_time_limit', default=99999)) diff --git a/code/infrastructure/streaming/clients/sub/ongdb/sub_ongdb.py b/code/infrastructure/streaming/clients/sub/ongdb/sub_ongdb.py index ab57d101c7f226c1efdb7c97f9ffa47ff10f9b95..ade6a327d879d2a12e6182cfc13c31fff54d73fe 100644 --- a/code/infrastructure/streaming/clients/sub/ongdb/sub_ongdb.py +++ b/code/infrastructure/streaming/clients/sub/ongdb/sub_ongdb.py @@ -6,7 +6,7 @@ import os broker_hostname=str(os.getenv('mos_host',default="localhost")) broker_port = int(os.getenv('mos_port',default=1883)) -client = mqtt.Client("Client5") +client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"Client5") db_uri =str(os.getenv('db_host',default="bolt://localhost:7687")) neo4j_auth=("","") abort_time_limit = int(os.getenv('abort_time_limit', default=99999)) diff --git a/code/infrastructure/streaming/clients/sub/postgres/sub_pg_cdm.py b/code/infrastructure/streaming/clients/sub/postgres/sub_pg_cdm.py index eb02587e15079374918160a17d70c0a1b4bc7e8a..6298d77f3df63f2a09e8f7cd17fcf09585ee5ad3 100755 --- a/code/infrastructure/streaming/clients/sub/postgres/sub_pg_cdm.py +++ b/code/infrastructure/streaming/clients/sub/postgres/sub_pg_cdm.py @@ -13,7 +13,7 @@ broker_hostname=str(os.getenv('mos_host',default="localhost")) broker_port = int(os.getenv('mos_port',default=1883)) file_path = str(os.getenv('path_sql_script',default='C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\streaming\\clients\\sub\\postgres\\import_node_edge.txt')) -client = mqtt.Client("Client3") +client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"Client3") abort_time_limit = int(os.getenv('abort_time_limit', default=99999)) def flatten_obj(key, val, target):