Skip to content
Snippets Groups Projects
Commit f95c7184 authored by Sven-Ove Hänsel's avatar Sven-Ove Hänsel
Browse files

update query infrastructure

parent bed8d6e3
No related branches found
No related tags found
No related merge requests found
......@@ -13,6 +13,7 @@ declare -A paths=(
["pub_cdm"]="./infrastructure/streaming/clients/pub"
["cypher_queries"]="./eval/queries/cypher/driver_latest"
["sql_queries"]="./eval/queries/sql"
["ongdb_queries"]="./eval/queries/cypher/ongdb"
)
for name in "${!paths[@]}"; do
......
......@@ -8,6 +8,7 @@ PORT = int(os.getenv('db_port',default=7687))
HOST = str(os.getenv('db_host',default='localhost'))
INTERVAL = int(os.getenv('interval',default=15))
log_directory = "log"
queries_file_path = str(os.getenv('query_file_path',default="C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\eval\\query\\neo4j\\queries.txt"))
# Connection details
uri = f"bolt://{HOST}:{PORT}"
......@@ -16,11 +17,17 @@ auth = ("","") # Use this for no authentication, or (USERNAME, PASSWORD) tuple
cypher_queries = {
"anc": "MATCH path=(node:Host)-[*]->(ancestor) RETURN ancestor, length(path) AS distance_upstream ORDER BY distance_upstream;",
"desc": "MATCH path=(node:Host)<-[*]-(descendant) RETURN descendant, length(path) AS distance_downstream ORDER BY distance_downstream;",
"path": "MATCH path = shortestPath((a:Subject where a._uuid='0CF2BB3E-36B8-11E8-BF66-D9AA8AFF4A69')-[*]-(b:Event)) RETURN path;",
"path": "MATCH path = shortestPath((a:Subject)-[*]-(b:Event)) where a._uuid='0CF2BB3E-36B8-11E8-BF66-D9AA8AFF4A69' RETURN path;",
"2-hop": "MATCH (startNode: Host)-[*1..2]-(neighborhood) RETURN neighborhood;"
}
def load_queries_from_file(file_path):
cypher_queries = {}
with open(file_path, 'r') as file:
for line in file:
if '|' in line: # Simple validation to ignore malformed lines
key, query = line.strip().split('|', 1)
cypher_queries[key] = query
return cypher_queries
# Function to execute a specific Cypher query and log response time
def execute_and_log_query(query_key, query):
......@@ -29,12 +36,16 @@ def execute_and_log_query(query_key, query):
result = session.run(query)
summary = result.consume()
end_time = time.time()
# Depending on the driver version, you might directly get the execution time from the summary
execution_time = summary.result_available_after + summary.result_consumed_after
# Check if the attributes are None and set them to 0 if they are
result_available_after = summary.result_available_after if summary.result_available_after is not None else 0
result_consumed_after = summary.result_consumed_after if summary.result_consumed_after is not None else 0
execution_time = result_available_after + result_consumed_after
logging.info(f"Results for Query {query_key}:")
logging.info(f"Python executed in: {end_time - start_time}s")
if execution_time == 0:
logging.warning(f"No internal DBMS metric available")
else:
logging.info(f"DBMS answered in: {execution_time} ms\n")
# Function to schedule and execute all queries
......@@ -63,7 +74,7 @@ logging.basicConfig(level=logging.INFO,
filemode='a', # Append to the log file if it exists, 'w' to overwrite
format='%(asctime)s - %(levelname)s - %(message)s')
cypher_queries = load_queries_from_file(queries_file_path)
# Initialize the Neo4j connection
driver = GraphDatabase.driver(uri=uri, auth=auth,encrypted=False)
logging.info(f"Connected to db... {uri}")
......
......@@ -11,7 +11,7 @@ RUN pip install schedule
ENV DOCKERIZE_VERSION v0.6.1
# Copy the Python script into the container
COPY cdm_cypher_queries.py /app/
COPY cypher_queries.py /app/
# Set environment variable 'chunk_size' and 'number_of_insertions'
ENV db_port='7687'
......@@ -19,4 +19,4 @@ ENV db_host='localhost'
ENV interval=15
ENV query_file_path='/app/queries/queries.txt'
CMD [ "python","cdm_cypher_queries.py" ]
\ No newline at end of file
CMD [ "python","cypher_queries.py" ]
\ No newline at end of file
from neo4j import GraphDatabase
import schedule
import time
import logging
import os
PORT = int(os.getenv('db_port',default=7687))
HOST = str(os.getenv('db_host',default='localhost'))
INTERVAL = int(os.getenv('interval',default=15))
log_directory = "log"
queries_file_path = str(os.getenv('query_file_path',default="C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\eval\\query\\neo4j\\queries.txt"))
# Connection details
uri = f"bolt://{HOST}:{PORT}"
auth = ("","") # Use this for no authentication, or (USERNAME, PASSWORD) tuple for authentication
cypher_queries = {
"anc": "MATCH path=(node:Host)-[*]->(ancestor) RETURN ancestor, length(path) AS distance_upstream ORDER BY distance_upstream;",
"desc": "MATCH path=(node:Host)<-[*]-(descendant) RETURN descendant, length(path) AS distance_downstream ORDER BY distance_downstream;",
"path": "MATCH path = shortestPath((a:Subject)-[*]-(b:Event)) where a._uuid='0CF2BB3E-36B8-11E8-BF66-D9AA8AFF4A69' RETURN path;",
"2-hop": "MATCH (startNode: Host)-[*1..2]-(neighborhood) RETURN neighborhood;"
}
def load_queries_from_file(file_path):
cypher_queries = {}
with open(file_path, 'r') as file:
for line in file:
if '|' in line: # Simple validation to ignore malformed lines
key, query = line.strip().split('|', 1)
cypher_queries[key] = query
return cypher_queries
# Function to execute a specific Cypher query and log response time
def execute_and_log_query(query_key, query):
start_time = time.time()
with driver.session() as session:
result = session.run(query)
summary = result.consume()
end_time = time.time()
# Check if the attributes are None and set them to 0 if they are
result_available_after = summary.result_available_after if summary.result_available_after is not None else 0
result_consumed_after = summary.result_consumed_after if summary.result_consumed_after is not None else 0
execution_time = result_available_after + result_consumed_after
logging.info(f"Results for Query {query_key}:")
logging.info(f"Python executed in: {end_time - start_time}s")
if execution_time == 0:
logging.warning(f"No internal DBMS metric available")
else:
logging.info(f"DBMS answered in: {execution_time} ms\n")
# Function to schedule and execute all queries
def schedule_and_execute_queries():
for query_key, query in cypher_queries.items():
execute_and_log_query(query_key=query_key, query=query)
# Example scheduling: execute immediately then every minute for demonstration
# TODO: Create the schedule here
schedule.every(INTERVAL).minutes.do(execute_and_log_query, query_key=query_key, query=query)
# schedule.every(15).minutes.do(execute_and_log_query, query_key=query_key, query=query)
# schedule.every(30).minutes.do()
while True:
schedule.run_pending()
time.sleep(1)
# create log directory
if not os.path.exists(log_directory):
os.makedirs(log_directory)
# Setup logging to file
formatted_time = time.strftime("%Y-%m-%d_%H-%M-%S")
logging.basicConfig(level=logging.INFO,
filename=f"{log_directory}/{formatted_time}_query_log.log",
filemode='a', # Append to the log file if it exists, 'w' to overwrite
format='%(asctime)s - %(levelname)s - %(message)s')
cypher_queries = load_queries_from_file(queries_file_path)
# Initialize the Neo4j connection
driver = GraphDatabase.driver(uri=uri, auth=auth,encrypted=False)
logging.info(f"Connected to db... {uri}")
# Main execution block with graceful shutdown
if __name__ == "__main__":
try:
schedule_and_execute_queries()
except KeyboardInterrupt:
logging.info("Script interrupted by user")
finally:
driver.close()
logging.info("Database connection closed.")
# Use an official Python runtime as a base image
FROM python:3.8
# Set working directory in the container
WORKDIR /app
# install dependencies
RUN pip install neo4j
RUN pip install schedule
ENV DOCKERIZE_VERSION v0.6.1
# Copy the Python script into the container
COPY cdm_cypher_queries.py /app/
# Set environment variable 'chunk_size' and 'number_of_insertions'
ENV db_port='7687'
ENV db_host='localhost'
ENV interval=15
ENV query_file_path='/app/queries/queries.txt'
CMD [ "python","cdm_cypher_queries.py" ]
\ No newline at end of file
from neo4j import GraphDatabase
import schedule
import time
import logging
import os
PORT = int(os.getenv('db_port',default=7687))
HOST = str(os.getenv('db_host',default='localhost'))
INTERVAL = int(os.getenv('interval',default=15))
log_directory = "log"
queries_file_path = str(os.getenv('query_file_path',default="C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\eval\\query\\neo4j\\queries.txt"))
# Connection details
uri = f"bolt://{HOST}:{PORT}"
auth = ("","") # Use this for no authentication, or (USERNAME, PASSWORD) tuple for authentication
cypher_queries = {
"anc": "MATCH path=(node:Host)-[*]->(ancestor) RETURN ancestor, length(path) AS distance_upstream ORDER BY distance_upstream;",
"desc": "MATCH path=(node:Host)<-[*]-(descendant) RETURN descendant, length(path) AS distance_downstream ORDER BY distance_downstream;",
"path": "MATCH path = shortestPath((a:Subject)-[*]-(b:Event)) where a._uuid='0CF2BB3E-36B8-11E8-BF66-D9AA8AFF4A69' RETURN path;",
"2-hop": "MATCH (startNode: Host)-[*1..2]-(neighborhood) RETURN neighborhood;"
}
def load_queries_from_file(file_path):
cypher_queries = {}
with open(file_path, 'r') as file:
for line in file:
if '|' in line: # Simple validation to ignore malformed lines
key, query = line.strip().split('|', 1)
cypher_queries[key] = query
return cypher_queries
# Function to execute a specific Cypher query and log response time
def execute_and_log_query(query_key, query):
start_time = time.time()
with driver.session() as session:
result = session.run(query)
summary = result.consume()
end_time = time.time()
# Check if the attributes are None and set them to 0 if they are
result_available_after = summary.result_available_after if summary.result_available_after is not None else 0
result_consumed_after = summary.result_consumed_after if summary.result_consumed_after is not None else 0
execution_time = result_available_after + result_consumed_after
logging.info(f"Results for Query {query_key}:")
logging.info(f"Python executed in: {end_time - start_time}s")
if execution_time == 0:
logging.warning(f"No internal DBMS metric available")
else:
logging.info(f"DBMS answered in: {execution_time} ms\n")
# Function to schedule and execute all queries
def schedule_and_execute_queries():
for query_key, query in cypher_queries.items():
execute_and_log_query(query_key=query_key, query=query)
# Example scheduling: execute immediately then every minute for demonstration
# TODO: Create the schedule here
schedule.every(INTERVAL).minutes.do(execute_and_log_query, query_key=query_key, query=query)
# schedule.every(15).minutes.do(execute_and_log_query, query_key=query_key, query=query)
# schedule.every(30).minutes.do()
while True:
schedule.run_pending()
time.sleep(1)
# create log directory
if not os.path.exists(log_directory):
os.makedirs(log_directory)
# Setup logging to file
formatted_time = time.strftime("%Y-%m-%d_%H-%M-%S")
logging.basicConfig(level=logging.INFO,
filename=f"{log_directory}/{formatted_time}_query_log.log",
filemode='a', # Append to the log file if it exists, 'w' to overwrite
format='%(asctime)s - %(levelname)s - %(message)s')
cypher_queries = load_queries_from_file(queries_file_path)
# Initialize the Neo4j connection
driver = GraphDatabase.driver(uri=uri, auth=auth,encrypted=False)
logging.info(f"Connected to db... {uri}")
# Main execution block with graceful shutdown
if __name__ == "__main__":
try:
schedule_and_execute_queries()
except KeyboardInterrupt:
logging.info("Script interrupted by user")
finally:
driver.close()
logging.info("Database connection closed.")
# Use an official Python runtime as a base image
FROM python:3.8
# Set working directory in the container
WORKDIR /app
# install dependencies
RUN pip install neo4j
RUN pip install schedule
ENV DOCKERIZE_VERSION v0.6.1
# Copy the Python script into the container
COPY cdm_cypher_queries.py /app/
# Set environment variable 'chunk_size' and 'number_of_insertions'
ENV db_port='7687'
ENV db_host='localhost'
ENV interval=15
ENV query_file_path='/app/queries/queries.txt'
CMD [ "python","cdm_cypher_queries.py" ]
\ No newline at end of file
# Use an official Python runtime as a base image
FROM python:3.8
# Set working directory in the container
WORKDIR /app
# install dependencies
RUN pip install neo4j==1.7.0
RUN pip install schedule
ENV DOCKERIZE_VERSION v0.6.1
# Copy the Python script into the container
COPY cypher_queries.py /app/
# Set environment variable 'chunk_size' and 'number_of_insertions'
ENV db_port='7687'
ENV db_host='localhost'
ENV interval=15
CMD [ "python","cypher_queries.py" ]
\ No newline at end of file
from neo4j import GraphDatabase
import schedule
import time
import logging
import os
PORT = int(os.getenv('db_port',default=7687))
HOST = str(os.getenv('db_host',default='localhost'))
INTERVAL = int(os.getenv('interval',default=15))
log_directory = "log"
queries_file_path = str(os.getenv('query_file_path',default="C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\eval\\query\\neo4j\\queries.txt"))
# Connection details
uri = f"bolt://{HOST}:{PORT}"
auth = ("","") # Use this for no authentication, or (USERNAME, PASSWORD) tuple for authentication
cypher_queries = {
"anc": "MATCH path=(node:Host)-[*]->(ancestor) RETURN ancestor, length(path) AS distance_upstream ORDER BY distance_upstream;",
"desc": "MATCH path=(node:Host)<-[*]-(descendant) RETURN descendant, length(path) AS distance_downstream ORDER BY distance_downstream;",
"path": "MATCH path = shortestPath((a:Subject)-[*]-(b:Event)) where a._uuid='0CF2BB3E-36B8-11E8-BF66-D9AA8AFF4A69' RETURN path;",
"2-hop": "MATCH (startNode: Host)-[*1..2]-(neighborhood) RETURN neighborhood;"
}
def load_queries_from_file(file_path):
cypher_queries = {}
with open(file_path, 'r') as file:
for line in file:
if '|' in line: # Simple validation to ignore malformed lines
key, query = line.strip().split('|', 1)
cypher_queries[key] = query
return cypher_queries
# Function to execute a specific Cypher query and log response time
def execute_and_log_query(query_key, query):
start_time = time.time()
with driver.session() as session:
result = session.run(query)
summary = result.consume()
end_time = time.time()
# Check if the attributes are None and set them to 0 if they are
result_available_after = summary.result_available_after if summary.result_available_after is not None else 0
result_consumed_after = summary.result_consumed_after if summary.result_consumed_after is not None else 0
execution_time = result_available_after + result_consumed_after
logging.info(f"Results for Query {query_key}:")
logging.info(f"Python executed in: {end_time - start_time}s")
if execution_time == 0:
logging.warning(f"No internal DBMS metric available")
else:
logging.info(f"DBMS answered in: {execution_time} ms\n")
# Function to schedule and execute all queries
def schedule_and_execute_queries():
for query_key, query in cypher_queries.items():
execute_and_log_query(query_key=query_key, query=query)
# Example scheduling: execute immediately then every minute for demonstration
# TODO: Create the schedule here
schedule.every(INTERVAL).minutes.do(execute_and_log_query, query_key=query_key, query=query)
# schedule.every(15).minutes.do(execute_and_log_query, query_key=query_key, query=query)
# schedule.every(30).minutes.do()
while True:
schedule.run_pending()
time.sleep(1)
# create log directory
if not os.path.exists(log_directory):
os.makedirs(log_directory)
# Setup logging to file
formatted_time = time.strftime("%Y-%m-%d_%H-%M-%S")
logging.basicConfig(level=logging.INFO,
filename=f"{log_directory}/{formatted_time}_query_log.log",
filemode='a', # Append to the log file if it exists, 'w' to overwrite
format='%(asctime)s - %(levelname)s - %(message)s')
cypher_queries = load_queries_from_file(queries_file_path)
# Initialize the Neo4j connection
driver = GraphDatabase.driver(uri=uri, auth=auth,encrypted=False)
logging.info(f"Connected to db... {uri}")
# Main execution block with graceful shutdown
if __name__ == "__main__":
try:
schedule_and_execute_queries()
except KeyboardInterrupt:
logging.info("Script interrupted by user")
finally:
driver.close()
logging.info("Database connection closed.")
......@@ -83,7 +83,7 @@ services:
- ./memgraph/mg_lib:/var/lib/memgraph
- ./memgraph/mg_log:/var/log/memgraph:rw
- ./memgraph/mg_etc:/etc/memgraph
# entrypoint: ["/usr/bin/supervisord"]
entrypoint: ["/usr/bin/supervisord"]
healthcheck:
test: ["CMD", "mgconsole"]
interval: 30s
......@@ -267,7 +267,7 @@ services:
- PGUSER=postgres
- PGPASSWORD=postgres
- PGPORT=5432
- interval=1
- interval=15
- query_file_path=/app/queries/queries.txt
volumes:
- ./eval/query/pg/log:/app/log
......@@ -279,7 +279,7 @@ services:
container_name: query_neo4j
image: lab.it.hs-hannover.de:4567/cwy-p8d-u1/ma_code/cypher_queries
environment:
- interval=1
- interval=15
- db_host=neo4j
- db_port=7687
- query_file_path=/app/queries/queries.txt
......@@ -291,12 +291,12 @@ services:
query_ongdb:
container_name: query_ongdb
image: lab.it.hs-hannover.de:4567/cwy-p8d-u1/ma_code/cypher_queries
image: lab.it.hs-hannover.de:4567/cwy-p8d-u1/ma_code/ongdb_queries
environment:
- interval=1
- interval=15
- db_host=ongdb
- db_port=7687
- query_file_path='/app/queries/queries.txt'
- query_file_path=/app/queries/queries.txt
volumes:
- ./eval/query/ongdb/log:/app/log
- ./eval/query/ongdb/queries:/app/queries
......@@ -307,7 +307,7 @@ services:
container_name: query_memgraph
image: lab.it.hs-hannover.de:4567/cwy-p8d-u1/ma_code/cypher_queries
environment:
- interval=1
- interval=15
- db_host=memgraph
- db_port=7687
- query_file_path=/app/queries/queries.txt
......@@ -326,6 +326,9 @@ volumes:
prometheus-data:
driver: local
neo4j:
mg_lib:
mg_log:
mg_etc:
networks:
postgres:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment