Skip to content
Snippets Groups Projects
Select Git revision
  • 32ac499995c865378a2e21f9377dda3407600b6a
  • master default
  • v0.52.5
  • v0.52.4
  • v0.52.3
  • v0.52.2
  • v0.52.1
  • v0.52.0
  • v0.51.0
  • v0.50.0
  • v0.49.0
11 results

import_users.py

Blame
  • cypher_queries.py 5.47 KiB
    from neo4j import GraphDatabase
    import schedule
    import time
    import logging
    import os
    import csv
    
    PORT = int(os.getenv('db_port',default=7687)) 
    HOST = str(os.getenv('db_host',default='localhost'))
    INTERVAL = int(os.getenv('interval',default=1))
    log_directory = "log"
    queries_file_path = str(os.getenv('query_file_path',default="C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\eval\\query\\neo4j\\queries.txt"))
    
    # Connection details
    uri = f"bolt://{HOST}:{PORT}"
    auth = ("","")  # Use this for no authentication, or (USERNAME, PASSWORD) tuple for authentication
    
    # create log directory
    if not os.path.exists(log_directory):
        os.makedirs(log_directory)
    
    # Setup logging to file
    formatted_time = time.strftime("%Y-%m-%d_%H-%M-%S")
    logging.basicConfig(level=logging.INFO, 
                        filename=f"{log_directory}/{formatted_time}_query_log.log", 
                        filemode='a',  # Append to the log file if it exists, 'w' to overwrite
                        format='%(asctime)s - %(levelname)s - %(message)s')
    
    csv_log_file = open(f'{log_directory}/{formatted_time}_query_logs.csv', mode='a', newline='')
    
    cypher_queries = {
        "anc": "MATCH path=(node:Host)-[*]->(ancestor) RETURN ancestor, length(path) AS distance_upstream ORDER BY distance_upstream;",
        "desc": "MATCH path=(node:Host)<-[*]-(descendant) RETURN descendant, length(path) AS distance_downstream ORDER BY distance_downstream;",
        "path": "MATCH path = shortestPath((a:Subject)-[*]-(b:Event)) where a._uuid='0CF2BB3E-36B8-11E8-BF66-D9AA8AFF4A69' RETURN path;",
        "2-hop": "MATCH (startNode: Host)-[*1..2]-(neighborhood) RETURN neighborhood;"
    }
    def load_queries_from_file(file_path):
        cypher_queries = {}
        with open(file_path, 'r') as file:
            for line in file:
                if '|' in line:  # Simple validation to ignore malformed lines
                    key, query = line.strip().split('|', 1)
                    cypher_queries[key] = query
        return cypher_queries
    
    # Function to execute a specific Cypher query and log response time
    # def execute_and_log_query(query_key, query):
    #     start_time = time.time()
    #     with driver.session() as session:
    #         result = session.run(query)
    #         summary = result.consume()
    #         end_time = time.time()
    #         # Check if the attributes are None and set them to 0 if they are
    #         result_available_after = summary.result_available_after if summary.result_available_after is not None else 0
    #         result_consumed_after = summary.result_consumed_after if summary.result_consumed_after is not None else 0
    #         execution_time = result_available_after + result_consumed_after
    
    #     logging.info(f"Results for Query {query_key}:")
    #     logging.info(f"Python executed in: {end_time - start_time}s")
    #     if execution_time == 0:
    #         logging.warning(f"No internal DBMS metric available")
    #     else:
    #         logging.info(f"DBMS answered in: {execution_time} ms\n")
    
    def execute_and_log_query(query_key, query):
        start_time = time.time()
        with driver.session() as session:
            result = session.run(query)
            row_count= len(result.data())
            summary = result.consume()
            end_time = time.time()
    
        # Check if the attributes are None and set them to 0 if they are
        result_available_after = summary.result_available_after if summary.result_available_after is not None else 0
        result_consumed_after = summary.result_consumed_after if summary.result_consumed_after is not None else 0
        row_count = row_count if row_count is not None else 0 
        execution_time = result_available_after + result_consumed_after
    
        logging.info(f"Results for Query {query_key}:")
        logging.info(f"Python executed in: {end_time - start_time}s")
        
        if row_count == 0:
            logging.info(f"Number of rows not available")
        else:
            logging.info(f"Number of rows {row_count}")
        if execution_time == 0:
            logging.warning(f"No internal DBMS metric available")
        else:
            logging.info(f"DBMS answered in: {execution_time} ms\n")
    
        # Open the CSV file in append mode and write the log information
        writer = csv.writer(csv_log_file)
        # Write a header if the file is newly created or empty, else append the data
        csv_log_file.seek(0, 2)  # Move the cursor to the end of the file
        if csv_log_file.tell() == 0:  # If file is empty, write a header
            writer.writerow(['Query Key', 'Start Time', 'End Time','Fetched nodes', 'Execution Time (ms)', 'Total Time (s)'])
        writer.writerow([query_key, start_time, end_time, row_count, execution_time, end_time - start_time])
    
    # Function to schedule and execute all queries
    def schedule_and_execute_queries():
        for query_key, query in cypher_queries.items():
            execute_and_log_query(query_key=query_key, query=query)
            # Example scheduling: execute immediately then every minute for demonstration
            # TODO: Create the schedule here
            schedule.every(INTERVAL).minutes.do(execute_and_log_query, query_key=query_key, query=query)
    
        while True:
            schedule.run_pending()
            time.sleep(1)
    
    cypher_queries = load_queries_from_file(queries_file_path)
    # Initialize the Neo4j connection
    driver = GraphDatabase.driver(uri=uri, auth=auth,encrypted=False)
    logging.info(f"Connected to db... {uri}")
    
    # Main execution block with graceful shutdown
    if __name__ == "__main__":
        try:
            schedule_and_execute_queries()
        except KeyboardInterrupt:
            logging.info("Script interrupted by user")
        finally:
            driver.close()
            logging.info("Database connection closed.")