Skip to content
Snippets Groups Projects
Commit 7a4585cf authored by Sven-Ove Hänsel's avatar Sven-Ove Hänsel
Browse files

seperate neo4j and memgraph queries

parent 772481cf
No related branches found
No related tags found
No related merge requests found
...@@ -70,7 +70,9 @@ def execute_and_log_query(query_key, query): ...@@ -70,7 +70,9 @@ def execute_and_log_query(query_key, query):
row_count = 0 row_count = 0
if query_key == '2-hop': if query_key == '2-hop':
for record in result: for record in result:
row_count += len(record.get('nodes')) nodes = record.get('nodes')
if nodes is not None: # Check if 'nodes' is not None
row_count += len(nodes)
else: else:
row_count= len(result.data()) row_count= len(result.data())
summary = result.consume() summary = result.consume()
......
...@@ -11,7 +11,7 @@ RUN pip install schedule ...@@ -11,7 +11,7 @@ RUN pip install schedule
ENV DOCKERIZE_VERSION v0.6.1 ENV DOCKERIZE_VERSION v0.6.1
# Copy the Python script into the container # Copy the Python script into the container
COPY cdm_cypher_queries.py /app/ COPY memgraph_queries.py /app/
# Set environment variable 'chunk_size' and 'number_of_insertions' # Set environment variable 'chunk_size' and 'number_of_insertions'
ENV db_port='7687' ENV db_port='7687'
...@@ -19,4 +19,4 @@ ENV db_host='localhost' ...@@ -19,4 +19,4 @@ ENV db_host='localhost'
ENV interval=15 ENV interval=15
ENV query_file_path='/app/queries/queries.txt' ENV query_file_path='/app/queries/queries.txt'
CMD [ "python","cdm_cypher_queries.py" ] CMD [ "python","memgraph_queries.py" ]
\ No newline at end of file \ No newline at end of file
...@@ -3,10 +3,11 @@ import schedule ...@@ -3,10 +3,11 @@ import schedule
import time import time
import logging import logging
import os import os
import csv
PORT = int(os.getenv('db_port',default=7687)) PORT = int(os.getenv('db_port',default=7687))
HOST = str(os.getenv('db_host',default='localhost')) HOST = str(os.getenv('db_host',default='localhost'))
INTERVAL = int(os.getenv('interval',default=15)) INTERVAL = int(os.getenv('interval',default=1))
log_directory = "log" log_directory = "log"
queries_file_path = str(os.getenv('query_file_path',default="C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\eval\\query\\neo4j\\queries.txt")) queries_file_path = str(os.getenv('query_file_path',default="C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\eval\\query\\neo4j\\queries.txt"))
...@@ -14,6 +15,19 @@ queries_file_path = str(os.getenv('query_file_path',default="C:\\Studium_MIN\\05 ...@@ -14,6 +15,19 @@ queries_file_path = str(os.getenv('query_file_path',default="C:\\Studium_MIN\\05
uri = f"bolt://{HOST}:{PORT}" uri = f"bolt://{HOST}:{PORT}"
auth = ("","") # Use this for no authentication, or (USERNAME, PASSWORD) tuple for authentication auth = ("","") # Use this for no authentication, or (USERNAME, PASSWORD) tuple for authentication
# create log directory
if not os.path.exists(log_directory):
os.makedirs(log_directory)
# Setup logging to file
formatted_time = time.strftime("%Y-%m-%d_%H-%M-%S")
logging.basicConfig(level=logging.INFO,
filename=f"{log_directory}/{formatted_time}_query_log.log",
filemode='a', # Append to the log file if it exists, 'w' to overwrite
format='%(asctime)s - %(levelname)s - %(message)s')
csv_log_file = open(f'{log_directory}/{formatted_time}_query_logs.csv', mode='a', newline='')
cypher_queries = { cypher_queries = {
"anc": "MATCH path=(node:Host)-[*]->(ancestor) RETURN ancestor, length(path) AS distance_upstream ORDER BY distance_upstream;", "anc": "MATCH path=(node:Host)-[*]->(ancestor) RETURN ancestor, length(path) AS distance_upstream ORDER BY distance_upstream;",
"desc": "MATCH path=(node:Host)<-[*]-(descendant) RETURN descendant, length(path) AS distance_downstream ORDER BY distance_downstream;", "desc": "MATCH path=(node:Host)<-[*]-(descendant) RETURN descendant, length(path) AS distance_downstream ORDER BY distance_downstream;",
...@@ -30,50 +44,83 @@ def load_queries_from_file(file_path): ...@@ -30,50 +44,83 @@ def load_queries_from_file(file_path):
return cypher_queries return cypher_queries
# Function to execute a specific Cypher query and log response time # Function to execute a specific Cypher query and log response time
# def execute_and_log_query(query_key, query):
# start_time = time.time()
# with driver.session() as session:
# result = session.run(query)
# summary = result.consume()
# end_time = time.time()
# # Check if the attributes are None and set them to 0 if they are
# result_available_after = summary.result_available_after if summary.result_available_after is not None else 0
# result_consumed_after = summary.result_consumed_after if summary.result_consumed_after is not None else 0
# execution_time = result_available_after + result_consumed_after
# logging.info(f"Results for Query {query_key}:")
# logging.info(f"Python executed in: {end_time - start_time}s")
# if execution_time == 0:
# logging.warning(f"No internal DBMS metric available")
# else:
# logging.info(f"DBMS answered in: {execution_time} ms\n")
def execute_and_log_query(query_key, query): def execute_and_log_query(query_key, query):
start_time = time.time() start_time = time.time()
with driver.session() as session: with driver.session() as session:
result = session.run(query) result = session.run(query)
end_time = time.time() # end_time = time.time()
row_count = 0
if query_key == '2-hop':
for record in result:
nodes = record.get('nodes')
if nodes is not None: # Check if 'nodes' is not None
row_count += len(nodes)
else:
row_count= len(result.data())
summary = result.consume() summary = result.consume()
end_time = time.time()
# Check if the attributes are None and set them to 0 if they are # Check if the attributes are None and set them to 0 if they are
result_available_after = summary.result_available_after if summary.result_available_after is not None else 0 result_available_after = summary.result_available_after if summary.result_available_after is not None else 0
result_consumed_after = summary.result_consumed_after if summary.result_consumed_after is not None else 0 result_consumed_after = summary.result_consumed_after if summary.result_consumed_after is not None else 0
execution_time = result_available_after + result_consumed_after row_count = row_count if row_count is not None else 0
execution_time = result_available_after
total_time2 = result_available_after + result_consumed_after
total_time = end_time - start_time
logging.info(f"Results for Query {query_key}:") logging.info(f"Results for Query {query_key}:")
logging.info(f"Python executed in: {end_time - start_time}s") logging.info(f"Python executed in: {total_time}s")
if row_count == 0:
logging.info(f"Number of rows not available")
else:
logging.info(f"Number of rows {row_count}")
if execution_time == 0: if execution_time == 0:
logging.warning(f"No internal DBMS metric available") logging.warning(f"No internal DBMS metric available")
else: else:
logging.info(f"DBMS answered in: {execution_time} ms\n") logging.info(f"DBMS answered in: {execution_time} ms\n")
logging.info(f"App python answered in: {total_time} ms\n")
logging.info(f"App db answered in: {total_time2} ms\n")
# Open the CSV file in append mode and write the log information
writer = csv.writer(csv_log_file)
# Write a header if the file is newly created or empty, else append the data
csv_log_file.seek(0, 2) # Move the cursor to the end of the file
if csv_log_file.tell() == 0: # If file is empty, write a header
writer.writerow(['Query Key', 'Start Time', 'End Time','Fetched nodes', 'Execution Time (ms)', 'Total Time (s)'])
writer.writerow([query_key, start_time, end_time, row_count, execution_time, total_time])
# Function to schedule and execute all queries # Function to schedule and execute all queries
def schedule_and_execute_queries(): def schedule_and_execute_queries():
for query_key, query in cypher_queries.items(): for query_key, query in cypher_queries.items():
execute_and_log_query(query_key=query_key, query=query) execute_and_log_query(query_key=query_key, query=query)
# Example scheduling: execute immediately then every minute for demonstration # Example scheduling: execute immediately then every minute for demonstration
# TODO: Create the schedule here # TODO: Create the schedule here
schedule.every(INTERVAL).minutes.do(execute_and_log_query, query_key=query_key, query=query) schedule.every(INTERVAL).minutes.do(execute_and_log_query, query_key=query_key, query=query)
# schedule.every(15).minutes.do(execute_and_log_query, query_key=query_key, query=query)
# schedule.every(30).minutes.do()
while True: while True:
schedule.run_pending() schedule.run_pending()
time.sleep(1) time.sleep(1)
# create log directory
if not os.path.exists(log_directory):
os.makedirs(log_directory)
# Setup logging to file
formatted_time = time.strftime("%Y-%m-%d_%H-%M-%S")
logging.basicConfig(level=logging.INFO,
filename=f"{log_directory}/{formatted_time}_query_log.log",
filemode='a', # Append to the log file if it exists, 'w' to overwrite
format='%(asctime)s - %(levelname)s - %(message)s')
cypher_queries = load_queries_from_file(queries_file_path) cypher_queries = load_queries_from_file(queries_file_path)
# Initialize the Neo4j connection # Initialize the Neo4j connection
driver = GraphDatabase.driver(uri=uri, auth=auth,encrypted=False) driver = GraphDatabase.driver(uri=uri, auth=auth,encrypted=False)
......
...@@ -11,7 +11,7 @@ RUN pip install schedule ...@@ -11,7 +11,7 @@ RUN pip install schedule
ENV DOCKERIZE_VERSION v0.6.1 ENV DOCKERIZE_VERSION v0.6.1
# Copy the Python script into the container # Copy the Python script into the container
COPY cdm_cypher_queries.py /app/ COPY neo4j_queries.py /app/
# Set environment variable 'chunk_size' and 'number_of_insertions' # Set environment variable 'chunk_size' and 'number_of_insertions'
ENV db_port='7687' ENV db_port='7687'
...@@ -19,4 +19,4 @@ ENV db_host='localhost' ...@@ -19,4 +19,4 @@ ENV db_host='localhost'
ENV interval=15 ENV interval=15
ENV query_file_path='/app/queries/queries.txt' ENV query_file_path='/app/queries/queries.txt'
CMD [ "python","cdm_cypher_queries.py" ] CMD [ "python","neo4j_queries.py" ]
\ No newline at end of file \ No newline at end of file
...@@ -3,10 +3,11 @@ import schedule ...@@ -3,10 +3,11 @@ import schedule
import time import time
import logging import logging
import os import os
import csv
PORT = int(os.getenv('db_port',default=7687)) PORT = int(os.getenv('db_port',default=7687))
HOST = str(os.getenv('db_host',default='localhost')) HOST = str(os.getenv('db_host',default='localhost'))
INTERVAL = int(os.getenv('interval',default=15)) INTERVAL = int(os.getenv('interval',default=1))
log_directory = "log" log_directory = "log"
queries_file_path = str(os.getenv('query_file_path',default="C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\eval\\query\\neo4j\\queries.txt")) queries_file_path = str(os.getenv('query_file_path',default="C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\eval\\query\\neo4j\\queries.txt"))
...@@ -14,12 +15,26 @@ queries_file_path = str(os.getenv('query_file_path',default="C:\\Studium_MIN\\05 ...@@ -14,12 +15,26 @@ queries_file_path = str(os.getenv('query_file_path',default="C:\\Studium_MIN\\05
uri = f"bolt://{HOST}:{PORT}" uri = f"bolt://{HOST}:{PORT}"
auth = ("","") # Use this for no authentication, or (USERNAME, PASSWORD) tuple for authentication auth = ("","") # Use this for no authentication, or (USERNAME, PASSWORD) tuple for authentication
cypher_queries = { # create log directory
"anc": "MATCH path=(node:Host)-[*]->(ancestor) RETURN ancestor, length(path) AS distance_upstream ORDER BY distance_upstream;", if not os.path.exists(log_directory):
"desc": "MATCH path=(node:Host)<-[*]-(descendant) RETURN descendant, length(path) AS distance_downstream ORDER BY distance_downstream;", os.makedirs(log_directory)
"path": "MATCH path = shortestPath((a:Subject)-[*]-(b:Event)) where a._uuid='0CF2BB3E-36B8-11E8-BF66-D9AA8AFF4A69' RETURN path;",
"2-hop": "MATCH (startNode: Host)-[*1..2]-(neighborhood) RETURN neighborhood;" # Setup logging to file
} formatted_time = time.strftime("%Y-%m-%d_%H-%M-%S")
logging.basicConfig(level=logging.INFO,
filename=f"{log_directory}/{formatted_time}_query_log.log",
filemode='a', # Append to the log file if it exists, 'w' to overwrite
format='%(asctime)s - %(levelname)s - %(message)s')
csv_log_file = open(f'{log_directory}/{formatted_time}_query_logs.csv', mode='a', newline='')
# cypher_queries = {
# "count":"MATCH (n) return n",
# "anc": "MATCH path=(node:Host)-[*]->(ancestor) RETURN ancestor, length(path) AS distance_upstream ORDER BY distance_upstream;",
# "desc": "MATCH path=(node:Host)<-[*]-(descendant) RETURN descendant, length(path) AS distance_downstream ORDER BY distance_downstream;",
# "path": "MATCH path = shortestPath((a:Subject)-[*]-(b:Event)) where a._uuid='0CF2BB3E-36B8-11E8-BF66-D9AA8AFF4A69' RETURN path;",
# "2-hop": "MATCH (startNode WHERE startNode._uuid = '9FF334BB-9072-D756-B290-556656D73728')-[*1..2]-(neighborhood) RETURN neighborhood;"
# }
def load_queries_from_file(file_path): def load_queries_from_file(file_path):
cypher_queries = {} cypher_queries = {}
with open(file_path, 'r') as file: with open(file_path, 'r') as file:
...@@ -30,47 +45,76 @@ def load_queries_from_file(file_path): ...@@ -30,47 +45,76 @@ def load_queries_from_file(file_path):
return cypher_queries return cypher_queries
# Function to execute a specific Cypher query and log response time # Function to execute a specific Cypher query and log response time
# def execute_and_log_query(query_key, query):
# start_time = time.time()
# with driver.session() as session:
# result = session.run(query)
# summary = result.consume()
# end_time = time.time()
# # Check if the attributes are None and set them to 0 if they are
# result_available_after = summary.result_available_after if summary.result_available_after is not None else 0
# result_consumed_after = summary.result_consumed_after if summary.result_consumed_after is not None else 0
# execution_time = result_available_after + result_consumed_after
# logging.info(f"Results for Query {query_key}:")
# logging.info(f"Python executed in: {end_time - start_time}s")
# if execution_time == 0:
# logging.warning(f"No internal DBMS metric available")
# else:
# logging.info(f"DBMS answered in: {execution_time} ms\n")
def execute_and_log_query(query_key, query): def execute_and_log_query(query_key, query):
start_time = time.time() start_time = time.time()
with driver.session() as session: with driver.session() as session:
result = session.run(query) result = session.run(query)
end_time = time.time() # end_time = time.time()
row_count= len(result.data())
summary = result.consume() summary = result.consume()
# Depending on the driver version, you might directly get the execution time from the summary end_time = time.time()
execution_time = summary.result_available_after + summary.result_consumed_after
execution_time = summary.result_available_after
end_time = summary.result_available_after + summary.result_consumed_after
# Check if the attributes are None and set them to 0 if they are
result_available_after = summary.result_available_after if summary.result_available_after is not None else 0
result_consumed_after = summary.result_consumed_after if summary.result_consumed_after is not None else 0
row_count = row_count if row_count is not None else 0
execution_time = result_available_after
total_time2 = result_available_after + result_consumed_after
total_time = end_time - start_time
logging.info(f"Results for Query {query_key}:") logging.info(f"Results for Query {query_key}:")
logging.info(f"Python executed in: {end_time - start_time}s") logging.info(f"Python executed in: {total_time}s")
if row_count == 0:
logging.info(f"Number of rows not available")
else:
logging.info(f"Number of rows {row_count}")
if execution_time == 0:
logging.warning(f"No internal DBMS metric available")
else:
logging.info(f"DBMS answered in: {execution_time} ms\n") logging.info(f"DBMS answered in: {execution_time} ms\n")
logging.info(f"App python answered in: {total_time} ms\n")
logging.info(f"App db answered in: {total_time2} ms\n")
# Open the CSV file in append mode and write the log information
writer = csv.writer(csv_log_file)
# Write a header if the file is newly created or empty, else append the data
csv_log_file.seek(0, 2) # Move the cursor to the end of the file
if csv_log_file.tell() == 0: # If file is empty, write a header
writer.writerow(['Query Key', 'Start Time', 'End Time','Fetched nodes', 'Execution Time (ms)', 'Total Time (s)'])
writer.writerow([query_key, start_time, end_time, row_count, execution_time, total_time])
# Function to schedule and execute all queries # Function to schedule and execute all queries
def schedule_and_execute_queries(): def schedule_and_execute_queries():
for query_key, query in cypher_queries.items(): for query_key, query in cypher_queries.items():
execute_and_log_query(query_key=query_key, query=query) execute_and_log_query(query_key=query_key, query=query)
# Example scheduling: execute immediately then every minute for demonstration # Example scheduling: execute immediately then every minute for demonstration
# TODO: Create the schedule here # TODO: Create the schedule here
schedule.every(INTERVAL).minutes.do(execute_and_log_query, query_key=query_key, query=query) schedule.every(INTERVAL).minutes.do(execute_and_log_query, query_key=query_key, query=query)
# schedule.every(15).minutes.do(execute_and_log_query, query_key=query_key, query=query)
# schedule.every(30).minutes.do()
while True: while True:
schedule.run_pending() schedule.run_pending()
time.sleep(1) time.sleep(1)
# create log directory
if not os.path.exists(log_directory):
os.makedirs(log_directory)
# Setup logging to file
formatted_time = time.strftime("%Y-%m-%d_%H-%M-%S")
logging.basicConfig(level=logging.INFO,
filename=f"{log_directory}/{formatted_time}_query_log.log",
filemode='a', # Append to the log file if it exists, 'w' to overwrite
format='%(asctime)s - %(levelname)s - %(message)s')
cypher_queries = load_queries_from_file(queries_file_path) cypher_queries = load_queries_from_file(queries_file_path)
# Initialize the Neo4j connection # Initialize the Neo4j connection
driver = GraphDatabase.driver(uri=uri, auth=auth,encrypted=False) driver = GraphDatabase.driver(uri=uri, auth=auth,encrypted=False)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment