Skip to content
Snippets Groups Projects
Commit 6886f1c3 authored by julian's avatar julian
Browse files

added functionality to start_tests.py to save results from multiple tests,...

added functionality to start_tests.py to save results from multiple tests, removed unneeded query code
parent 6a259c9e
Branches
No related tags found
No related merge requests found
...@@ -2,15 +2,14 @@ ...@@ -2,15 +2,14 @@
PGUSER=postgres PGUSER=postgres
PGDATABASE=postgres PGDATABASE=postgres
PGHOST=postgres PGHOST=postgres
PGPASSWORD=postgres
# pub-sub # pub-sub
MOS_HOST=mosquitto MOS_HOST=mosquitto
MOS_TOPIC=ta1-cadets-e3-official MOS_TOPIC=ta1-cadets-e3-official
# pub # pub
LINES_PER_SECOND=1 LINES_PER_SECOND=1500
BATCH_SIZE=1 BATCH_SIZE=1500
# query # query
QUERY_INTERVAL=1 QUERY_INTERVAL=1
...@@ -22,5 +21,3 @@ PGADMIN_DEFAULT_EMAIL=admin@abc.de ...@@ -22,5 +21,3 @@ PGADMIN_DEFAULT_EMAIL=admin@abc.de
PGADMIN_DEFAULT_PASSWORD=admin PGADMIN_DEFAULT_PASSWORD=admin
PGADMIN_DISABLE_POSTFIX=True PGADMIN_DISABLE_POSTFIX=True
# grafana
DS_PROMETHEUS=prometheus_src
...@@ -22,11 +22,11 @@ services: ...@@ -22,11 +22,11 @@ services:
- PGHOST - PGHOST
- PGDATABASE - PGDATABASE
- PGUSER - PGUSER
- PGPASSFILE=/run/secrets/postgres_pass - PGPASSFILE=/run/secrets/pgpass
profiles: profiles:
- experiment - experiment
secrets: secrets:
- postgres_pass - pgpass
depends_on: depends_on:
mosquitto: mosquitto:
condition: service_started condition: service_started
...@@ -40,17 +40,17 @@ services: ...@@ -40,17 +40,17 @@ services:
- PGHOST - PGHOST
- PGDATABASE - PGDATABASE
- PGUSER - PGUSER
- PGPASSFILE=/run/secrets/postgres_pass - PGPASSFILE=/run/secrets/pgpass
- QUERY_INTERVAL - QUERY_INTERVAL
profiles: profiles:
- experiment - experiment
secrets: secrets:
- postgres_pass - pgpass
depends_on: depends_on:
- sub_pg - sub_pg
mosquitto: mosquitto:
image: eclipse-mosquitto image: eclipse-mosquitto:latest
hostname: $MOS_HOST hostname: $MOS_HOST
profiles: profiles:
- experiment - experiment
...@@ -82,7 +82,7 @@ services: ...@@ -82,7 +82,7 @@ services:
- postgres_db_pass - postgres_db_pass
pgadmin: pgadmin:
image: dpage/pgadmin4 image: dpage/pgadmin4:latest
environment: environment:
- PGADMIN_CONFIG_SERVER_MODE - PGADMIN_CONFIG_SERVER_MODE
- PGADMIN_CONFIG_MASTER_PASSWORD_REQUIRED - PGADMIN_CONFIG_MASTER_PASSWORD_REQUIRED
...@@ -96,12 +96,6 @@ services: ...@@ -96,12 +96,6 @@ services:
configs: configs:
- source: pgadmin_server_conf - source: pgadmin_server_conf
target: /pgadmin4/servers.json target: /pgadmin4/servers.json
secrets:
- postgres_pass
depends_on:
postgres:
condition: service_healthy
restart: true
configs: configs:
postgres_conf: postgres_conf:
...@@ -114,7 +108,7 @@ configs: ...@@ -114,7 +108,7 @@ configs:
file: ./data/ta1-cadets-e3-official.zip file: ./data/ta1-cadets-e3-official.zip
secrets: secrets:
postgres_pass: pgpass:
file: ./.pgpass file: ./.pgpass
postgres_db_pass: postgres_db_pass:
file: ./postgres_pass.txt file: ./postgres_pass
File moved
from datetime import datetime
from signal import SIGINT, SIGTERM, signal from signal import SIGINT, SIGTERM, signal
from typing import Sequence, Tuple, Any, TypeAlias, Deque from typing import Sequence, Tuple, Any, Deque
from time import strftime, perf_counter, sleep from time import strftime, perf_counter, sleep
from os import environ, path, makedirs from os import environ, path, makedirs
from logging import info, basicConfig, INFO from logging import info, basicConfig, INFO
from psycopg import Cursor, IsolationLevel, connect, ClientCursor from psycopg import connect
from datetime import timedelta from datetime import timedelta
from collections import deque from collections import deque
from statistics import fmean from statistics import fmean
import schedule import schedule
import csv import csv
query: TypeAlias = Tuple[str, Sequence | None]
def write_logs(log_path: str, log_data: Tuple[str, float, float, float, int]):
query, _, exec_time, exec_time_py, row_count = log_data
info(f"Results of '{query}' query:")
info(f"DBMS query '{query}' executed in {exec_time:.4f} ms")
info(f"Query '{query}' ran for {exec_time_py:.4f} seconds in python")
info(f"Rows fetched for '{query}': {row_count}\n")
with open(log_path, mode="a", newline="") as log_file:
csv.writer(log_file).writerow(log_data)
def execute_queries(cur: Cursor[Any], queries: Sequence[query], log_path: str):
for query, params in queries:
start = perf_counter()
analyze_query = f"EXPLAIN (ANALYZE, FORMAT JSON) EXECUTE {query};"
result: Any = cur.execute(analyze_query, params).fetchone()
exec_time_py = perf_counter() - start
plan = result[0][0]["Plan"]
exec_time = plan["Actual Total Time"]
row_count = plan["Actual Rows"]
write_logs(log_path, (query, start, exec_time, exec_time_py, row_count))
def job(queries: Sequence[query], log_path: str):
with open("/app/queries.sql") as prepared:
prepared_str = prepared.read() # TODO read file only once
with connect(cursor_factory=ClientCursor) as conn:
conn.set_read_only(True)
conn.set_isolation_level(IsolationLevel.SERIALIZABLE)
with conn.cursor() as cur, conn.transaction():
cur.execute(prepared_str)
execute_queries(cur, queries, log_path)
vertex_counts: Deque[Tuple[int, float]] = deque(maxlen=10) vertex_counts: Deque[Tuple[int, float]] = deque(maxlen=10)
def perf_job() -> None: def perf_job(path: str):
with connect() as conn, conn.cursor() as cur: with connect() as conn, conn.cursor() as cur:
result: Any = cur.execute("SELECT COUNT(*) FROM vertex").fetchone() result: Any = cur.execute("SELECT COUNT(*) FROM vertex").fetchone()
with open(csv_path, "a") as file:
csv.writer(file).writerow(
(datetime.now().strftime("%H:%M:%S.%f"), result[0])
)
vertex_counts.appendleft((result[0], perf_counter())) vertex_counts.appendleft((result[0], perf_counter()))
perf_list = [] perf_list = []
for i in range(len(vertex_counts) - 1): for i in range(len(vertex_counts) - 1):
...@@ -74,44 +44,19 @@ if __name__ == "__main__": ...@@ -74,44 +44,19 @@ if __name__ == "__main__":
signal(SIGTERM, exit_gracefully) signal(SIGTERM, exit_gracefully)
signal(SIGINT, exit_gracefully) signal(SIGINT, exit_gracefully)
interval = int(environ["QUERY_INTERVAL"]) basicConfig(level=INFO, format="%(asctime)s - %(levelname)s - %(message)s")
log_directory = "log"
log_path = f"{log_directory}/{strftime('%Y-%m-%d_%H-%M-%S')}_query_log"
csv_name = f"{log_path}.csv"
queries = (
("count_vertices", None),
# ("get_ancestors(%s, %s)", (1, 5)),
# ("get_descendants(%s, %s)", (1, 5)),
# (
# "shortest_path(%s, %s, %s)",
# (
# "A6A7C956-0132-5506-96D1-2A7DE97CB400",
# "8DA367BF-36C2-11E8-BF66-D9AA8AFF4A69",
# 1,
# ),
# ),
# ("two_hop_old(%s)", (113,)),
# ("two_hop_new(%s)", (113,)),
)
csv_header = (
"Query Key",
"Start Time",
"Execution Time (ms)",
"Total Time (s)",
"Rows Fetched",
)
if not path.exists(log_directory): log_dir = "log"
makedirs(log_directory) csv_path = f"{log_dir}/{strftime("%Y-%m-%d_%H-%M-%S")}_perf_data.csv"
with open(csv_name, "w") as file:
csv.writer(file).writerow(csv_header)
basicConfig(level=INFO, format="%(asctime)s - %(levelname)s - %(message)s") if not path.exists(log_dir):
makedirs(log_dir)
with open(csv_path, "w") as file:
csv.writer(file).writerow(("TIMESTAMP", "VERTEX_COUNT"))
info("Starting...") info("Starting...")
end_time = timedelta(hours=2) end_time = timedelta(hours=1)
# schedule.every(interval).minutes.until(end_time).do(job, queries, csv_name).run() schedule.every(5).seconds.until(end_time).do(perf_job, csv_path)
schedule.every(5).seconds.until(end_time).do(perf_job)
while (sleep_time := schedule.idle_seconds()) is not None and not interrupted: while (sleep_time := schedule.idle_seconds()) is not None and not interrupted:
sleep(max(sleep_time, 0)) sleep(max(sleep_time, 0))
schedule.run_pending() schedule.run_pending()
......
{ {
"Servers": { "Servers": {
"1": { "1": {
"Name": "cadets", "Name": "postgres",
"Group": "Servers", "Group": "Servers",
"Host": "postgres",
"Port": 5432, "Port": 5432,
"MaintenanceDB": "postgres",
"Username": "postgres", "Username": "postgres",
"PassFile": "/run/secrets/postgres_pass", "TunnelPort": "22",
"Host": "postgres", "KerberosAuthentication": false
"SSLMode": "prefer",
"MaintenanceDB": "postgres"
} }
} }
} }
\ No newline at end of file
from time import sleep
from os import environ, makedirs, path from os import environ, makedirs, path
from subprocess import Popen, run from subprocess import run
from typing import Sequence from typing import Sequence
def run_experiment(compose_files: Sequence[str]): def run_experiment(compose_files: Sequence[str], env: dict = {}):
print("Start compose environment...") environ["COMPOSE_PROFILES"] = "experiment"
environ["COMPOSE_PROFILES"] = "experiment,inspect" for key, val in env.items():
Popen(("docker", "down")).wait(30) environ[key] = str(val)
run(("docker", "compose", "down"))
f = ["-f"] * len(compose_files) f = ["-f"] * len(compose_files)
interleaved = (val for pair in zip(f, compose_files) for val in pair) interleaved = (val for pair in zip(f, compose_files) for val in pair)
print("Start compose environment...")
run(("docker", "compose", *interleaved, "up", "-d")) run(("docker", "compose", *interleaved, "up", "-d"))
sleep_time = 10 print("waiting for data run to finish")
print(f"sleep for {sleep_time/60} minutes") run(("docker", "compose", "wait", "query_pg"))
sleep(sleep_time)
run(("docker", "compose", "cp", "query_pg:log/*", "result/")) print("data run finished, saving result")
run(("docker", "compose", "cp", "query_pg:/app/log/.", "result/"))
run(("docker", "compose", "down")) run(("docker", "compose", "down"))
...@@ -24,4 +26,6 @@ if __name__ == "__main__": ...@@ -24,4 +26,6 @@ if __name__ == "__main__":
result_dir = "result" result_dir = "result"
if not path.exists(result_dir): if not path.exists(result_dir):
makedirs(result_dir) makedirs(result_dir)
run_experiment(("compose.yml", "edge-id-empty.yml", "index.yml")) run_experiment(
("compose.yml", "edge-id-empty.yml", "index.yml"), {"LINES_PER_SECOND": 3000}
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment