diff --git a/code/eval/queries/cypher/driver_latest/cypher_queries.py b/code/eval/queries/cypher/driver_latest/cypher_queries.py index b7b7e790a730522f9e474540096beff94b000ac3..0ebc5629792cf0a4a219dced66f23405f13f2528 100644 --- a/code/eval/queries/cypher/driver_latest/cypher_queries.py +++ b/code/eval/queries/cypher/driver_latest/cypher_queries.py @@ -69,8 +69,10 @@ def execute_and_log_query(query_key, query): # end_time = time.time() row_count = 0 if query_key == '2-hop': - for record in result: - row_count += len(record.get('nodes')) + for record in result: + nodes = record.get('nodes') + if nodes is not None: # Check if 'nodes' is not None + row_count += len(nodes) else: row_count= len(result.data()) summary = result.consume() diff --git a/code/eval/queries/cypher/memgraph/Dockerfile b/code/eval/queries/cypher/memgraph/Dockerfile index 5fefbd1c3a953846b01acd43698670c825d2e24e..32cf2beed4df4d3b368d21ab8f098f193f16bb2d 100644 --- a/code/eval/queries/cypher/memgraph/Dockerfile +++ b/code/eval/queries/cypher/memgraph/Dockerfile @@ -11,7 +11,7 @@ RUN pip install schedule ENV DOCKERIZE_VERSION v0.6.1 # Copy the Python script into the container -COPY cdm_cypher_queries.py /app/ +COPY memgraph_queries.py /app/ # Set environment variable 'chunk_size' and 'number_of_insertions' ENV db_port='7687' @@ -19,4 +19,4 @@ ENV db_host='localhost' ENV interval=15 ENV query_file_path='/app/queries/queries.txt' -CMD [ "python","cdm_cypher_queries.py" ] \ No newline at end of file +CMD [ "python","memgraph_queries.py" ] \ No newline at end of file diff --git a/code/eval/queries/cypher/memgraph/memgraph_queries.py b/code/eval/queries/cypher/memgraph/memgraph_queries.py index 8126d53ca4792a253cab31997c3e8af849eb633c..0ebc5629792cf0a4a219dced66f23405f13f2528 100644 --- a/code/eval/queries/cypher/memgraph/memgraph_queries.py +++ b/code/eval/queries/cypher/memgraph/memgraph_queries.py @@ -3,10 +3,11 @@ import schedule import time import logging import os +import csv PORT = int(os.getenv('db_port',default=7687)) HOST = str(os.getenv('db_host',default='localhost')) -INTERVAL = int(os.getenv('interval',default=15)) +INTERVAL = int(os.getenv('interval',default=1)) log_directory = "log" queries_file_path = str(os.getenv('query_file_path',default="C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\eval\\query\\neo4j\\queries.txt")) @@ -14,6 +15,19 @@ queries_file_path = str(os.getenv('query_file_path',default="C:\\Studium_MIN\\05 uri = f"bolt://{HOST}:{PORT}" auth = ("","") # Use this for no authentication, or (USERNAME, PASSWORD) tuple for authentication +# create log directory +if not os.path.exists(log_directory): + os.makedirs(log_directory) + +# Setup logging to file +formatted_time = time.strftime("%Y-%m-%d_%H-%M-%S") +logging.basicConfig(level=logging.INFO, + filename=f"{log_directory}/{formatted_time}_query_log.log", + filemode='a', # Append to the log file if it exists, 'w' to overwrite + format='%(asctime)s - %(levelname)s - %(message)s') + +csv_log_file = open(f'{log_directory}/{formatted_time}_query_logs.csv', mode='a', newline='') + cypher_queries = { "anc": "MATCH path=(node:Host)-[*]->(ancestor) RETURN ancestor, length(path) AS distance_upstream ORDER BY distance_upstream;", "desc": "MATCH path=(node:Host)<-[*]-(descendant) RETURN descendant, length(path) AS distance_downstream ORDER BY distance_downstream;", @@ -30,50 +44,83 @@ def load_queries_from_file(file_path): return cypher_queries # Function to execute a specific Cypher query and log response time +# def execute_and_log_query(query_key, query): +# start_time = time.time() +# with driver.session() as session: +# result = session.run(query) +# summary = result.consume() +# end_time = time.time() +# # Check if the attributes are None and set them to 0 if they are +# result_available_after = summary.result_available_after if summary.result_available_after is not None else 0 +# result_consumed_after = summary.result_consumed_after if summary.result_consumed_after is not None else 0 +# execution_time = result_available_after + result_consumed_after + +# logging.info(f"Results for Query {query_key}:") +# logging.info(f"Python executed in: {end_time - start_time}s") +# if execution_time == 0: +# logging.warning(f"No internal DBMS metric available") +# else: +# logging.info(f"DBMS answered in: {execution_time} ms\n") + def execute_and_log_query(query_key, query): start_time = time.time() with driver.session() as session: result = session.run(query) - end_time = time.time() + # end_time = time.time() + row_count = 0 + if query_key == '2-hop': + for record in result: + nodes = record.get('nodes') + if nodes is not None: # Check if 'nodes' is not None + row_count += len(nodes) + else: + row_count= len(result.data()) summary = result.consume() - # Check if the attributes are None and set them to 0 if they are - result_available_after = summary.result_available_after if summary.result_available_after is not None else 0 - result_consumed_after = summary.result_consumed_after if summary.result_consumed_after is not None else 0 - execution_time = result_available_after + result_consumed_after + end_time = time.time() + # Check if the attributes are None and set them to 0 if they are + result_available_after = summary.result_available_after if summary.result_available_after is not None else 0 + result_consumed_after = summary.result_consumed_after if summary.result_consumed_after is not None else 0 + row_count = row_count if row_count is not None else 0 + execution_time = result_available_after + total_time2 = result_available_after + result_consumed_after + total_time = end_time - start_time logging.info(f"Results for Query {query_key}:") - logging.info(f"Python executed in: {end_time - start_time}s") + logging.info(f"Python executed in: {total_time}s") + + if row_count == 0: + logging.info(f"Number of rows not available") + else: + logging.info(f"Number of rows {row_count}") if execution_time == 0: logging.warning(f"No internal DBMS metric available") else: logging.info(f"DBMS answered in: {execution_time} ms\n") + logging.info(f"App python answered in: {total_time} ms\n") + logging.info(f"App db answered in: {total_time2} ms\n") + + + + # Open the CSV file in append mode and write the log information + writer = csv.writer(csv_log_file) + # Write a header if the file is newly created or empty, else append the data + csv_log_file.seek(0, 2) # Move the cursor to the end of the file + if csv_log_file.tell() == 0: # If file is empty, write a header + writer.writerow(['Query Key', 'Start Time', 'End Time','Fetched nodes', 'Execution Time (ms)', 'Total Time (s)']) + writer.writerow([query_key, start_time, end_time, row_count, execution_time, total_time]) # Function to schedule and execute all queries def schedule_and_execute_queries(): for query_key, query in cypher_queries.items(): - execute_and_log_query(query_key=query_key, query=query) # Example scheduling: execute immediately then every minute for demonstration # TODO: Create the schedule here schedule.every(INTERVAL).minutes.do(execute_and_log_query, query_key=query_key, query=query) - # schedule.every(15).minutes.do(execute_and_log_query, query_key=query_key, query=query) - # schedule.every(30).minutes.do() while True: schedule.run_pending() time.sleep(1) -# create log directory -if not os.path.exists(log_directory): - os.makedirs(log_directory) - -# Setup logging to file -formatted_time = time.strftime("%Y-%m-%d_%H-%M-%S") -logging.basicConfig(level=logging.INFO, - filename=f"{log_directory}/{formatted_time}_query_log.log", - filemode='a', # Append to the log file if it exists, 'w' to overwrite - format='%(asctime)s - %(levelname)s - %(message)s') - cypher_queries = load_queries_from_file(queries_file_path) # Initialize the Neo4j connection driver = GraphDatabase.driver(uri=uri, auth=auth,encrypted=False) diff --git a/code/eval/queries/cypher/neo4j/Dockerfile b/code/eval/queries/cypher/neo4j/Dockerfile index 5fefbd1c3a953846b01acd43698670c825d2e24e..05340a862ffae22febcea5afd48fcdca7c3862cb 100644 --- a/code/eval/queries/cypher/neo4j/Dockerfile +++ b/code/eval/queries/cypher/neo4j/Dockerfile @@ -11,7 +11,7 @@ RUN pip install schedule ENV DOCKERIZE_VERSION v0.6.1 # Copy the Python script into the container -COPY cdm_cypher_queries.py /app/ +COPY neo4j_queries.py /app/ # Set environment variable 'chunk_size' and 'number_of_insertions' ENV db_port='7687' @@ -19,4 +19,4 @@ ENV db_host='localhost' ENV interval=15 ENV query_file_path='/app/queries/queries.txt' -CMD [ "python","cdm_cypher_queries.py" ] \ No newline at end of file +CMD [ "python","neo4j_queries.py" ] \ No newline at end of file diff --git a/code/eval/queries/cypher/neo4j/neo4j_queries.py b/code/eval/queries/cypher/neo4j/neo4j_queries.py index 7cb739d33cdf5a72e4653396d5ae4032754ead76..046936d0f7d5d43e4959bc9a71c3bd918ad2aa8a 100644 --- a/code/eval/queries/cypher/neo4j/neo4j_queries.py +++ b/code/eval/queries/cypher/neo4j/neo4j_queries.py @@ -3,10 +3,11 @@ import schedule import time import logging import os +import csv PORT = int(os.getenv('db_port',default=7687)) HOST = str(os.getenv('db_host',default='localhost')) -INTERVAL = int(os.getenv('interval',default=15)) +INTERVAL = int(os.getenv('interval',default=1)) log_directory = "log" queries_file_path = str(os.getenv('query_file_path',default="C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\eval\\query\\neo4j\\queries.txt")) @@ -14,12 +15,26 @@ queries_file_path = str(os.getenv('query_file_path',default="C:\\Studium_MIN\\05 uri = f"bolt://{HOST}:{PORT}" auth = ("","") # Use this for no authentication, or (USERNAME, PASSWORD) tuple for authentication -cypher_queries = { - "anc": "MATCH path=(node:Host)-[*]->(ancestor) RETURN ancestor, length(path) AS distance_upstream ORDER BY distance_upstream;", - "desc": "MATCH path=(node:Host)<-[*]-(descendant) RETURN descendant, length(path) AS distance_downstream ORDER BY distance_downstream;", - "path": "MATCH path = shortestPath((a:Subject)-[*]-(b:Event)) where a._uuid='0CF2BB3E-36B8-11E8-BF66-D9AA8AFF4A69' RETURN path;", - "2-hop": "MATCH (startNode: Host)-[*1..2]-(neighborhood) RETURN neighborhood;" -} +# create log directory +if not os.path.exists(log_directory): + os.makedirs(log_directory) + +# Setup logging to file +formatted_time = time.strftime("%Y-%m-%d_%H-%M-%S") +logging.basicConfig(level=logging.INFO, + filename=f"{log_directory}/{formatted_time}_query_log.log", + filemode='a', # Append to the log file if it exists, 'w' to overwrite + format='%(asctime)s - %(levelname)s - %(message)s') + +csv_log_file = open(f'{log_directory}/{formatted_time}_query_logs.csv', mode='a', newline='') + +# cypher_queries = { +# "count":"MATCH (n) return n", +# "anc": "MATCH path=(node:Host)-[*]->(ancestor) RETURN ancestor, length(path) AS distance_upstream ORDER BY distance_upstream;", +# "desc": "MATCH path=(node:Host)<-[*]-(descendant) RETURN descendant, length(path) AS distance_downstream ORDER BY distance_downstream;", +# "path": "MATCH path = shortestPath((a:Subject)-[*]-(b:Event)) where a._uuid='0CF2BB3E-36B8-11E8-BF66-D9AA8AFF4A69' RETURN path;", +# "2-hop": "MATCH (startNode WHERE startNode._uuid = '9FF334BB-9072-D756-B290-556656D73728')-[*1..2]-(neighborhood) RETURN neighborhood;" +# } def load_queries_from_file(file_path): cypher_queries = {} with open(file_path, 'r') as file: @@ -30,47 +45,76 @@ def load_queries_from_file(file_path): return cypher_queries # Function to execute a specific Cypher query and log response time +# def execute_and_log_query(query_key, query): +# start_time = time.time() +# with driver.session() as session: +# result = session.run(query) +# summary = result.consume() +# end_time = time.time() +# # Check if the attributes are None and set them to 0 if they are +# result_available_after = summary.result_available_after if summary.result_available_after is not None else 0 +# result_consumed_after = summary.result_consumed_after if summary.result_consumed_after is not None else 0 +# execution_time = result_available_after + result_consumed_after + +# logging.info(f"Results for Query {query_key}:") +# logging.info(f"Python executed in: {end_time - start_time}s") +# if execution_time == 0: +# logging.warning(f"No internal DBMS metric available") +# else: +# logging.info(f"DBMS answered in: {execution_time} ms\n") + def execute_and_log_query(query_key, query): start_time = time.time() with driver.session() as session: result = session.run(query) - end_time = time.time() + # end_time = time.time() + row_count= len(result.data()) summary = result.consume() - # Depending on the driver version, you might directly get the execution time from the summary - execution_time = summary.result_available_after + summary.result_consumed_after - execution_time = summary.result_available_after - end_time = summary.result_available_after + summary.result_consumed_after + end_time = time.time() + # Check if the attributes are None and set them to 0 if they are + result_available_after = summary.result_available_after if summary.result_available_after is not None else 0 + result_consumed_after = summary.result_consumed_after if summary.result_consumed_after is not None else 0 + row_count = row_count if row_count is not None else 0 + execution_time = result_available_after + total_time2 = result_available_after + result_consumed_after + total_time = end_time - start_time logging.info(f"Results for Query {query_key}:") - logging.info(f"Python executed in: {end_time - start_time}s") - logging.info(f"DBMS answered in: {execution_time} ms\n") + logging.info(f"Python executed in: {total_time}s") + + if row_count == 0: + logging.info(f"Number of rows not available") + else: + logging.info(f"Number of rows {row_count}") + if execution_time == 0: + logging.warning(f"No internal DBMS metric available") + else: + logging.info(f"DBMS answered in: {execution_time} ms\n") + logging.info(f"App python answered in: {total_time} ms\n") + logging.info(f"App db answered in: {total_time2} ms\n") + + + + # Open the CSV file in append mode and write the log information + writer = csv.writer(csv_log_file) + # Write a header if the file is newly created or empty, else append the data + csv_log_file.seek(0, 2) # Move the cursor to the end of the file + if csv_log_file.tell() == 0: # If file is empty, write a header + writer.writerow(['Query Key', 'Start Time', 'End Time','Fetched nodes', 'Execution Time (ms)', 'Total Time (s)']) + writer.writerow([query_key, start_time, end_time, row_count, execution_time, total_time]) # Function to schedule and execute all queries def schedule_and_execute_queries(): for query_key, query in cypher_queries.items(): - execute_and_log_query(query_key=query_key, query=query) # Example scheduling: execute immediately then every minute for demonstration # TODO: Create the schedule here schedule.every(INTERVAL).minutes.do(execute_and_log_query, query_key=query_key, query=query) - # schedule.every(15).minutes.do(execute_and_log_query, query_key=query_key, query=query) - # schedule.every(30).minutes.do() while True: schedule.run_pending() time.sleep(1) -# create log directory -if not os.path.exists(log_directory): - os.makedirs(log_directory) - -# Setup logging to file -formatted_time = time.strftime("%Y-%m-%d_%H-%M-%S") -logging.basicConfig(level=logging.INFO, - filename=f"{log_directory}/{formatted_time}_query_log.log", - filemode='a', # Append to the log file if it exists, 'w' to overwrite - format='%(asctime)s - %(levelname)s - %(message)s') - cypher_queries = load_queries_from_file(queries_file_path) # Initialize the Neo4j connection driver = GraphDatabase.driver(uri=uri, auth=auth,encrypted=False)