Skip to content
Snippets Groups Projects
Commit b36847bd authored by Sven-Ove Hänsel's avatar Sven-Ove Hänsel
Browse files

update infrastructure for queries

parent 988e5f8b
No related branches found
No related tags found
No related merge requests found
# Use an official Python runtime as a base image
FROM python:3.8
# Set working directory in the container
WORKDIR /app
# install dependencies
RUN pip install neo4j==1.7.0
RUN pip install schedule
ENV DOCKERIZE_VERSION v0.6.1
# Copy the Python script into the container
COPY cdm_cypher_queries.py /app/
# Set environment variable 'chunk_size' and 'number_of_insertions'
ENV db_port='7687'
ENV db_host='localhost'
ENV interval=15
CMD [ "python","cdm_cypher_queries.py" ]
\ No newline at end of file
from neo4j import GraphDatabase
import schedule
import time
import logging
import os
PORT = int(os.getenv('db_port',default=7687))
HOST = str(os.getenv('db_host',default='localhost'))
INTERVAL = int(os.getenv('interval',default=15))
log_directory = "log"
# Connection details
uri = f"bolt://{HOST}:{PORT}"
auth = ("","") # Use this for no authentication, or (USERNAME, PASSWORD) tuple for authentication
cypher_queries = {
"anc": "MATCH path=(node:Host)-[*]->(ancestor) RETURN ancestor, length(path) AS distance_upstream ORDER BY distance_upstream;",
"desc": "MATCH path=(node:Host)<-[*]-(descendant) RETURN descendant, length(path) AS distance_downstream ORDER BY distance_downstream;",
"path": "MATCH path = shortestPath((a:Subject where a._uuid='0CF2BB3E-36B8-11E8-BF66-D9AA8AFF4A69')-[*]-(b:Event)) RETURN path;",
"2-hop": "MATCH (startNode: Host)-[*1..2]-(neighborhood) RETURN neighborhood;"
}
# Function to execute a specific Cypher query and log response time
def execute_and_log_query(query_key, query):
start_time = time.time()
with driver.session() as session:
result = session.run(query)
summary = result.consume()
end_time = time.time()
# Depending on the driver version, you might directly get the execution time from the summary
execution_time = summary.result_available_after + summary.result_consumed_after
logging.info(f"Results for Query {query_key}:")
logging.info(f"Python executed in: {end_time - start_time}s")
logging.info(f"DBMS answered in: {execution_time} ms\n")
# Function to schedule and execute all queries
def schedule_and_execute_queries():
for query_key, query in cypher_queries.items():
execute_and_log_query(query_key=query_key, query=query)
# Example scheduling: execute immediately then every minute for demonstration
# TODO: Create the schedule here
schedule.every(INTERVAL).minutes.do(execute_and_log_query, query_key=query_key, query=query)
# schedule.every(15).minutes.do(execute_and_log_query, query_key=query_key, query=query)
# schedule.every(30).minutes.do()
while True:
schedule.run_pending()
time.sleep(1)
# create log directory
if not os.path.exists(log_directory):
os.makedirs(log_directory)
# Setup logging to file
formatted_time = time.strftime("%Y-%m-%d_%H-%M-%S")
logging.basicConfig(level=logging.INFO,
filename=f"{log_directory}/{formatted_time}_query_log.log",
filemode='a', # Append to the log file if it exists, 'w' to overwrite
format='%(asctime)s - %(levelname)s - %(message)s')
# Initialize the Neo4j connection
driver = GraphDatabase.driver(uri=uri, auth=auth, encrypted=False)
logging.info(f"Connected to db... {uri}")
# Main execution block with graceful shutdown
if __name__ == "__main__":
try:
schedule_and_execute_queries()
except KeyboardInterrupt:
logging.info("Script interrupted by user")
finally:
driver.close()
logging.info("Database connection closed.")
# Use an official Python runtime as a base image
FROM python:3
# Set working directory in the container
WORKDIR /app
# install dependencies
RUN pip install psycopg2
RUN pip install schedule
ENV DOCKERIZE_VERSION v0.6.1
# Copy the Python script into the container
COPY cdm_sql_queries.py /app/
# Set environment variable 'chunk_size' and 'number_of_insertions'
ENV PGHOST='7687'
ENV PGDATABASE='localhost'
ENV PGUSER='localhost'
ENV PGPASSWORD='localhost'
ENV PGPORT='localhost'
ENV interval=15
CMD [ "python","cdm_sql_queries.py" ]
\ No newline at end of file
import psycopg2
import time
import os
import logging
import schedule
log_directory = "log"
hostname = os.getenv('PGHOST', 'localhost') # Default to localhost if not set
database = os.getenv('PGDATABASE', 'postgres')
username = os.getenv('PGUSER', 'postgres')
password = os.getenv('PGPASSWORD', 'postgres')
port_id = os.getenv('PGPORT', '5432') # Default to 5432 if not set
interval = int(os.getenv('interval',default=15))
def connect_to_db():
try:
# Establish a connection to the database
conn = psycopg2.connect(
host=hostname,
dbname=database,
user=username,
password=password,
port=port_id
)
logging.info(f"Connected to db {database} at {hostname}:{port_id}")
return conn
except psycopg2.Error as e:
logging.error(f"Error connecting to PostgreSQL database: {e}")
return None
def execute_queries(conn):
try:
# Create a cursor object
cur = conn.cursor()
# Dictionary of SQL queries to execute
queries = {
"ancestors": """
WITH RECURSIVE AncestorCTE AS (
SELECT n.node_no, e.dest, n.type, 1 AS Level
FROM node_list n
JOIN edge_list e ON n.uuid = e.source
WHERE n.node_no = 30 -- Replace with the desired starting node_no
UNION ALL
SELECT n.node_no, e.dest, n.type, d.Level + 1
FROM node_list n
JOIN edge_list e ON n.uuid = e.source
JOIN AncestorCTE d ON e.dest = n.uuid
WHERE d.Level <5 -- Replace with the desired level
)
SELECT DISTINCT node_no, dest, type, Level FROM AncestorCTE;
""",
"descendant": """
WITH RECURSIVE DescendantCTE AS (
SELECT n.node_no, e.source, 1 AS Level
FROM node_list n
JOIN edge_list e ON n.uuid = e.dest
WHERE n.node_no = 1 -- Replace with the desired starting node_no
UNION ALL
SELECT n.node_no, e.source, a.Level + 1
FROM node_list n
JOIN edge_list e ON n.uuid = e.dest
JOIN DescendantCTE a ON e.source = n.uuid
WHERE a.Level < 3 -- Replace with the desired level
)
SELECT DISTINCT node_no, source, Level FROM DescendantCTE;
""",
"path": """
WITH RECURSIVE path_cte AS (
-- Anchor member initialization
SELECT
el.source AS start_node,
el.dest AS end_node,
ARRAY[el.source::varchar] AS path, -- Cast to varchar to ensure type consistency
1 AS depth -- Keep track of the depth to prevent infinite loops
FROM
edge_list el
WHERE
el.source = '5CC868CD-FF30-5E2B-BB74-6C5B474A62B2' -- Replace with the UUID of the starting node
UNION ALL
-- Recursive member definition
SELECT
p.start_node,
el.dest,
p.path || el.dest::varchar, -- Ensure el.dest is cast to varchar
p.depth + 1
FROM
edge_list el
JOIN
path_cte p ON el.source = p.end_node
WHERE
NOT (el.dest = ANY(p.path)) -- Prevent cycles by ensuring we don't revisit nodes
AND p.depth < 100 -- Example limit to prevent infinite recursion
)
-- Final query to select the path
SELECT distinct
start_node, end_node, path, depth
FROM
path_cte
WHERE
end_node = '83C8ED1F-5045-DBCD-B39F-918F0DF4F851' -- Replace with the UUID of the ending node
ORDER BY
depth ASC -- Optional: orders by the shortest path
--LIMIT 1 -- Optionally limit to one path if multiple paths exist
;
""",
"k-hop": """Select 1;
"""
}
for key, query in queries.items():
start_time = time.time()
cur.execute(query)
execution_time = time.time() - start_time
rows = cur.fetchall()
logging.info(f"Results of '{key}' query:")
logging.info(f"Query execution time for '{key}': {execution_time:.4f} seconds")
logging.info(f"Rows fetched for '{key}': {cur.rowcount}\n")
cur.close()
except psycopg2.Error as e:
logging.error(f"Error executing queries: {e}")
def job():
conn = connect_to_db()
if conn is not None:
execute_queries(conn)
conn.close()
if not os.path.exists(log_directory):
os.makedirs(log_directory)
formatted_time = time.strftime("%Y-%m-%d_%H-%M-%S")
logging.basicConfig(level=logging.INFO,
filename=f"{log_directory}/{formatted_time}_query_log.log",
filemode='a',
format='%(asctime)s - %(levelname)s - %(message)s')
# Scheduling the job every 15 minutes
logging.info("Starting...")
job()
schedule.every(interval).minutes.do(job)
if __name__ == "__main__":
while True:
schedule.run_pending()
time.sleep(1)
...@@ -257,6 +257,61 @@ services: ...@@ -257,6 +257,61 @@ services:
sub_mem: sub_mem:
condition: service_started condition: service_started
# infrastructure for testing
query_pg:
container_name: query_pg
image: lab.it.hs-hannover.de:4567/cwy-p8d-u1/ma_code/sql_queries
environment:
- PGHOST=postgres
- PGDATABASE=postgres
- PGUSER=postgres
- PGPASSWORD=postgres
- PGPORT=5432
- interval=1
volumes:
- ./eval/query/pg/log:/app/log
depends_on:
- pub
query_neo4j:
container_name: query_neo4j
image: lab.it.hs-hannover.de:4567/cwy-p8d-u1/ma_code/cypher_queries
environment:
- interval=1
- db_host=neo4j
- db_port=7687
volumes:
- ./eval/query/neo4j/log:/app/log
depends_on:
- pub
query_ongdb:
container_name: query_ongdb
image: lab.it.hs-hannover.de:4567/cwy-p8d-u1/ma_code/cypher_queries
environment:
- interval=1
- db_host=ongdb
- db_port=7687
volumes:
- ./eval/query/ongdb/log:/app/log
depends_on:
- pub
query_memgraph:
container_name: query_memgraph
image: lab.it.hs-hannover.de:4567/cwy-p8d-u1/ma_code/cypher_queries
environment:
- interval=1
- db_host=memgraph
- db_port=7687
volumes:
- ./eval/query/memgraph/log:/app/log
depends_on:
- pub
volumes: volumes:
postgres: postgres:
pgadmin: pgadmin:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment