from neo4j import GraphDatabase import schedule import time import logging import os import csv PORT = int(os.getenv('db_port',default=7687)) HOST = str(os.getenv('db_host',default='localhost')) INTERVAL = int(os.getenv('interval',default=1)) log_directory = "log" queries_file_path = str(os.getenv('query_file_path',default="C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\eval\\query\\neo4j\\queries.txt")) # Connection details uri = f"bolt://{HOST}:{PORT}" auth = ("","") # Use this for no authentication, or (USERNAME, PASSWORD) tuple for authentication # create log directory if not os.path.exists(log_directory): os.makedirs(log_directory) # Setup logging to file formatted_time = time.strftime("%Y-%m-%d_%H-%M-%S") logging.basicConfig(level=logging.INFO, filename=f"{log_directory}/{formatted_time}_query_log.log", filemode='a', # Append to the log file if it exists, 'w' to overwrite format='%(asctime)s - %(levelname)s - %(message)s') csv_log_file = open(f'{log_directory}/{formatted_time}_query_logs.csv', mode='a', newline='') cypher_queries = { "anc": "MATCH path=(node:Host)-[*]->(ancestor) RETURN ancestor, length(path) AS distance_upstream ORDER BY distance_upstream;", "desc": "MATCH path=(node:Host)<-[*]-(descendant) RETURN descendant, length(path) AS distance_downstream ORDER BY distance_downstream;", "path": "MATCH path = shortestPath((a:Subject)-[*]-(b:Event)) where a._uuid='0CF2BB3E-36B8-11E8-BF66-D9AA8AFF4A69' RETURN path;", "2-hop": "MATCH (startNode: Host)-[*1..2]-(neighborhood) RETURN neighborhood;" } def load_queries_from_file(file_path): cypher_queries = {} with open(file_path, 'r') as file: for line in file: if '|' in line: # Simple validation to ignore malformed lines key, query = line.strip().split('|', 1) cypher_queries[key] = query return cypher_queries # Function to execute a specific Cypher query and log response time # def execute_and_log_query(query_key, query): # start_time = time.time() # with driver.session() as session: # result = session.run(query) # summary = result.consume() # end_time = time.time() # # Check if the attributes are None and set them to 0 if they are # result_available_after = summary.result_available_after if summary.result_available_after is not None else 0 # result_consumed_after = summary.result_consumed_after if summary.result_consumed_after is not None else 0 # execution_time = result_available_after + result_consumed_after # logging.info(f"Results for Query {query_key}:") # logging.info(f"Python executed in: {end_time - start_time}s") # if execution_time == 0: # logging.warning(f"No internal DBMS metric available") # else: # logging.info(f"DBMS answered in: {execution_time} ms\n") def execute_and_log_query(query_key, query): start_time = time.time() with driver.session() as session: result = session.run(query) row_count= len(result.data()) summary = result.consume() end_time = time.time() # Check if the attributes are None and set them to 0 if they are result_available_after = summary.result_available_after if summary.result_available_after is not None else 0 result_consumed_after = summary.result_consumed_after if summary.result_consumed_after is not None else 0 row_count = row_count if row_count is not None else 0 execution_time = result_available_after + result_consumed_after logging.info(f"Results for Query {query_key}:") logging.info(f"Python executed in: {end_time - start_time}s") if row_count == 0: logging.info(f"Number of rows not available") else: logging.info(f"Number of rows {row_count}") if execution_time == 0: logging.warning(f"No internal DBMS metric available") else: logging.info(f"DBMS answered in: {execution_time} ms\n") # Open the CSV file in append mode and write the log information writer = csv.writer(csv_log_file) # Write a header if the file is newly created or empty, else append the data csv_log_file.seek(0, 2) # Move the cursor to the end of the file if csv_log_file.tell() == 0: # If file is empty, write a header writer.writerow(['Query Key', 'Start Time', 'End Time','Fetched nodes', 'Execution Time (ms)', 'Total Time (s)']) writer.writerow([query_key, start_time, end_time, row_count, execution_time, end_time - start_time]) # Function to schedule and execute all queries def schedule_and_execute_queries(): for query_key, query in cypher_queries.items(): execute_and_log_query(query_key=query_key, query=query) # Example scheduling: execute immediately then every minute for demonstration # TODO: Create the schedule here schedule.every(INTERVAL).minutes.do(execute_and_log_query, query_key=query_key, query=query) while True: schedule.run_pending() time.sleep(1) cypher_queries = load_queries_from_file(queries_file_path) # Initialize the Neo4j connection driver = GraphDatabase.driver(uri=uri, auth=auth,encrypted=False) logging.info(f"Connected to db... {uri}") # Main execution block with graceful shutdown if __name__ == "__main__": try: schedule_and_execute_queries() except KeyboardInterrupt: logging.info("Script interrupted by user") finally: driver.close() logging.info("Database connection closed.")