Skip to content
Snippets Groups Projects
Commit 439b9dda authored by julian's avatar julian
Browse files

removed mosquitto, instead reading data directly in the inserter to...

removed mosquitto, instead reading data directly in the inserter to automatically insert at maximum possible speed
parent d75a086a
No related branches found
No related tags found
No related merge requests found
services: services:
pub:
build: streaming/pub
environment:
- MOS_HOST
- MOS_TOPIC
- LINES_PER_SECOND
- BATCH_SIZE
profiles:
- experiment
configs:
- data.zip
depends_on:
- mosquitto
- pg_inserter
pg_inserter: pg_inserter:
build: streaming/sub/postgres build: insert
environment: environment:
- MOS_HOST
- MOS_TOPIC
- PGHOST - PGHOST
- PGDATABASE - PGDATABASE
- PGUSER - PGUSER
- EDGE_INSERTS - EDGE_INSERTS
- UUID_EDGES - UUID_EDGES
- BATCH_SIZE
- PGPASSFILE=/run/secrets/pgpass - PGPASSFILE=/run/secrets/pgpass
profiles: profiles:
- experiment - experiment
configs:
- data.zip
secrets: secrets:
- pgpass - pgpass
depends_on: depends_on:
mosquitto:
condition: service_started
postgres: postgres:
condition: service_healthy condition: service_healthy
restart: true restart: true
query_pg: pg_query:
build: queries build: query
environment: environment:
- PGHOST - PGHOST
- PGDATABASE - PGDATABASE
...@@ -51,15 +35,6 @@ services: ...@@ -51,15 +35,6 @@ services:
depends_on: depends_on:
- pg_inserter - pg_inserter
mosquitto:
image: eclipse-mosquitto:latest
hostname: $MOS_HOST
profiles:
- experiment
configs:
- source: mosquitto_conf
target: /mosquitto/config/mosquitto.conf
postgres: postgres:
build: postgres build: postgres
command: -c config_file=/postgres_conf command: -c config_file=/postgres_conf
......
FROM python:3
WORKDIR /app
RUN pip install "psycopg[binary,pool]"
COPY pg_insert.py ./
# ENV PGHOST= PGDATABASE= PGUSER= PGPASSWORD= BATCH_SIZE= EDGE_INSERTS= UUID_EDGES=
CMD ["python", "-u", "pg_insert.py"]
from signal import SIGINT, SIGTERM, signal from signal import SIGINT, SIGTERM, signal
from paho.mqtt.client import Client, CallbackAPIVersion, MQTTMessage, ReasonCode from zipfile import ZIP_LZMA, ZipFile
from typing import Tuple, Any, Iterable, Sequence, TypeAlias, Final from typing import IO, List, Tuple, Any, Iterable, Sequence, TypeAlias, Final
from os import environ, getenv from os import environ, getenv
from logging import basicConfig, INFO, DEBUG, info, debug, critical from logging import basicConfig, INFO, info
import sys
import json import json
import psycopg import psycopg
...@@ -40,17 +39,17 @@ def first_key(obj: dict) -> str: ...@@ -40,17 +39,17 @@ def first_key(obj: dict) -> str:
def unpack_node(node_entry: dict) -> Tuple[str, str, dict]: def unpack_node(node_entry: dict) -> Tuple[str, str, dict]:
node = node_entry["datum"] node = node_entry["datum"]
node_type = first_key(node) node_type = first_key(node)
node = node[node_type] content = node[node_type]
uuid = node["uuid"] uuid = content["uuid"]
return (uuid, node_type, node) return (uuid, node_type, content)
def format_vertex_query(node_list: Sequence[Tuple[str, str, dict]]) -> Parameter_Query: def format_node_query(node_list: Sequence[Tuple[str, str, dict]]) -> Parameter_Query:
node_json = ((type, json.dumps(node)) for _, type, node in node_list) node_json = ((type, json.dumps(node)) for _, type, node in node_list)
return ("INSERT INTO vertex (type, content) VALUES(%s, %s)", node_json) return ("INSERT INTO vertex (type, content) VALUES(%s, %s)", node_json)
def try_keys(obj: dict, keys: Sequence[str]) -> str | None: def try_path(obj: dict, keys: Sequence[str]) -> str | None:
val: Any = obj val: Any = obj
for key in keys: for key in keys:
val = val.get(key) val = val.get(key)
...@@ -60,8 +59,8 @@ def try_keys(obj: dict, keys: Sequence[str]) -> str | None: ...@@ -60,8 +59,8 @@ def try_keys(obj: dict, keys: Sequence[str]) -> str | None:
def get_edges(uuid: str, type: str, content: dict) -> Iterable[Tuple[str, str, str]]: def get_edges(uuid: str, type: str, content: dict) -> Iterable[Tuple[str, str, str]]:
for edge_type, edge_keys in NODE_TYPES[type]: for edge_type, edge_path in NODE_TYPES[type]:
if (dest := try_keys(content, edge_keys)) is not None: if (dest := try_path(content, edge_path)) is not None:
yield (uuid, dest, edge_type) yield (uuid, dest, edge_type)
...@@ -83,66 +82,56 @@ def format_edge_id(nodes: Sequence[Tuple[str, str, dict]]) -> Tuple[str, Sequenc ...@@ -83,66 +82,56 @@ def format_edge_id(nodes: Sequence[Tuple[str, str, dict]]) -> Tuple[str, Sequenc
return (query_str, [val for edge in edges for val in edge]) return (query_str, [val for edge in edges for val in edge])
class Postgres_inserter: def zipped_files(path: str) -> Iterable[IO[bytes]]:
def __init__(self, db_conf={}, client_conf={}, topic="", broker_conf={}): with ZipFile(path, "r", ZIP_LZMA) as zip_file:
self.broker_conf = broker_conf for name in zip_file.namelist():
self.topic = topic with zip_file.open(name) as file:
self.db_conn = psycopg.connect(**db_conf) yield file
info("Connected to DB.")
self.client = Client(**client_conf)
self.client.on_connect = self.on_connect def read_zip(path: str, batch_size: int) -> Iterable[Sequence[bytes]]:
self.client.on_message = self.on_message lines: List[bytes] = []
signal(SIGINT, self.exit_gracefully) for file in zipped_files(path):
signal(SIGTERM, self.exit_gracefully) while (line := next(file, None)) is not None:
lines.append(line)
def start(self): if len(lines) >= batch_size:
self.client.connect(**self.broker_conf) yield lines
self.client.loop_forever() lines.clear()
if len(lines) > 0:
def on_message(self, client: Client, userdata: Any, message: MQTTMessage): yield lines
json_list = json.loads(message.payload.replace(HEADER_SCHEMA, b""))
nodes = [unpack_node(entry) for entry in json_list]
with self.db_conn.cursor() as cursor:
cursor.executemany(*format_vertex_query(nodes))
if len(getenv("EDGE_INSERTS", "")) > 0:
if len(getenv("UUID_EDGES", "")) > 0:
cursor.executemany(*format_edge_uuid(nodes))
else:
query, params = format_edge_id(nodes)
if len(params) > 0:
cursor.execute(query, params)
self.db_conn.commit()
def on_connect(self, client: Client, _2, _3, reason_code: ReasonCode, _4):
if not reason_code.is_failure:
client.subscribe(self.topic, qos=2)
info("Connected to broker.")
else:
critical(f"Connection to broker failed: {reason_code.getName()}")
def __enter__(self): interrupted = False
return self
def __exit__(self, type, value, tb):
if hasattr(self, "client"):
self.client.disconnect()
if hasattr(self, "db_conn"):
self.db_conn.close()
def exit_gracefully(self, signum, frame): def exit_gracefully(signum, frame):
sys.exit(0) global interrupted
interrupted = True
if __name__ == "__main__": if __name__ == "__main__":
basicConfig(level=DEBUG) signal(SIGTERM, exit_gracefully)
configuration: dict = { signal(SIGINT, exit_gracefully)
"client_conf": { basicConfig(level=INFO)
"callback_api_version": CallbackAPIVersion.VERSION2, batch_size = max(int(environ["BATCH_SIZE"]), 1)
"client_id": "postgres", file = "/data.zip"
"clean_session": False, try:
}, with psycopg.connect() as connection:
"topic": environ["MOS_TOPIC"], for batch in read_zip(file, batch_size):
"broker_conf": {"host": environ["MOS_HOST"], "keepalive": 3600 * 2}, if interrupted:
} break
with Postgres_inserter(**configuration) as subscriber: batch_short = (line.replace(HEADER_SCHEMA, b"") for line in batch)
subscriber.start() json_iter = (json.loads(line) for line in batch_short)
nodes = [unpack_node(entry) for entry in json_iter]
with connection.cursor() as cursor:
cursor.executemany(*format_node_query(nodes))
if len(getenv("EDGE_INSERTS", "")) > 0:
if len(getenv("UUID_EDGES", "")) > 0:
cursor.executemany(*format_edge_uuid(nodes))
else:
query, params = format_edge_id(nodes)
if len(params) > 0:
cursor.execute(query, params)
connection.commit()
finally:
connection.close()
...@@ -16,10 +16,10 @@ def run_experiment(name: str, compose_files: Sequence[str], env: dict = {}): ...@@ -16,10 +16,10 @@ def run_experiment(name: str, compose_files: Sequence[str], env: dict = {}):
run(("docker", "compose", *interleaved, "up", "-d", "--build")) run(("docker", "compose", *interleaved, "up", "-d", "--build"))
info(f"waiting for {name} to finish") info(f"waiting for {name} to finish")
run(("docker", "compose", "wait", "query_pg")) run(("docker", "compose", "wait", "pg_query"))
info(f"{name} finished, saving result") info(f"{name} finished, saving result")
run(("docker", "compose", "cp", "query_pg:/app/log/data.csv", f"result/{name}.csv")) run(("docker", "compose", "cp", "pg_query:/app/log/data.csv", f"result/{name}.csv"))
run(("docker", "compose", "down")) run(("docker", "compose", "down"))
...@@ -30,104 +30,54 @@ if __name__ == "__main__": ...@@ -30,104 +30,54 @@ if __name__ == "__main__":
makedirs(result_dir) makedirs(result_dir)
experiments: Sequence[dict] = ( experiments: Sequence[dict] = (
{ {
"name": "experiment_batch1_10000", "name": "experiment_batch1",
"compose_files": ("compose.yml", "edge-id.yml", "index.yml"), "compose_files": ("compose.yml", "edge-id.yml", "index.yml"),
"env": {"LINES_PER_SECOND": 10000, "BATCH_SIZE": 1}, "env": {"BATCH_SIZE": 1},
}, },
{ {
"name": "experiment_batch10_10000", "name": "experiment_batch10",
"compose_files": ("compose.yml", "edge-id.yml", "index.yml"), "compose_files": ("compose.yml", "edge-id.yml", "index.yml"),
"env": {"LINES_PER_SECOND": 10000, "BATCH_SIZE": 10}, "env": {"BATCH_SIZE": 10},
}, },
{ {
"name": "experiment_batch10_20000", "name": "experiment_batch100",
"compose_files": ("compose.yml", "edge-id.yml", "index.yml"), "compose_files": ("compose.yml", "edge-id.yml", "index.yml"),
"env": {"LINES_PER_SECOND": 20000, "BATCH_SIZE": 10}, "env": {"BATCH_SIZE": 100},
}, },
{ {
"name": "experiment_batch100_10000", "name": "experiment_batch1000",
"compose_files": ("compose.yml", "edge-id.yml", "index.yml"), "compose_files": ("compose.yml", "edge-id.yml", "index.yml"),
"env": {"LINES_PER_SECOND": 10000, "BATCH_SIZE": 100}, "env": {"BATCH_SIZE": 1000},
},
{
"name": "experiment_batch100_20000",
"compose_files": ("compose.yml", "edge-id.yml", "index.yml"),
"env": {"LINES_PER_SECOND": 20000, "BATCH_SIZE": 100},
},
{
"name": "experiment_batch1000_10000",
"compose_files": ("compose.yml", "edge-id.yml", "index.yml"),
"env": {"LINES_PER_SECOND": 10000, "BATCH_SIZE": 1000},
},
{
"name": "experiment_batch1000_20000",
"compose_files": ("compose.yml", "edge-id.yml", "index.yml"),
"env": {"LINES_PER_SECOND": 20000, "BATCH_SIZE": 1000},
}, },
{ {
"name": "experiment_no_uuid_index", "name": "experiment_no_uuid_index",
"compose_files": ("compose.yml", "edge-id-no-index.yml", "index.yml"), "compose_files": ("compose.yml", "edge-id-no-index.yml", "index.yml"),
"env": {"LINES_PER_SECOND": 1500, "BATCH_SIZE": 1500}, "env": {"BATCH_SIZE": 5000},
},
{
"name": "experiment_default_10000",
"compose_files": ("compose.yml", "edge-id.yml", "index.yml"),
"env": {"LINES_PER_SECOND": 10000, "BATCH_SIZE": 5000},
},
{
"name": "experiment_default_20000",
"compose_files": ("compose.yml", "edge-id.yml", "index.yml"),
"env": {"LINES_PER_SECOND": 20000, "BATCH_SIZE": 5000},
}, },
{ {
"name": "experiment_default_30000", "name": "experiment_default",
"compose_files": ("compose.yml", "edge-id.yml", "index.yml"), "compose_files": ("compose.yml", "edge-id.yml", "index.yml"),
"env": {"LINES_PER_SECOND": 30000, "BATCH_SIZE": 5000}, "env": {"BATCH_SIZE": 5000},
}, },
{ {
"name": "experiment_hash_index_15000", "name": "experiment_hash_index",
"compose_files": ("compose.yml", "edge-id.yml", "index-hash.yml"), "compose_files": ("compose.yml", "edge-id.yml", "index-hash.yml"),
"env": {"LINES_PER_SECOND": 15000, "BATCH_SIZE": 5000}, "env": {"BATCH_SIZE": 5000},
}, },
{ {
"name": "experiment_hash_index_25000", "name": "experiment_multicolumn_index",
"compose_files": ("compose.yml", "edge-id.yml", "index-hash.yml"),
"env": {"LINES_PER_SECOND": 25000, "BATCH_SIZE": 5000},
},
{
"name": "experiment_multicolumn_index_15000",
"compose_files": ("compose.yml", "edge-id.yml", "index-multi.yml"),
"env": {"LINES_PER_SECOND": 15000, "BATCH_SIZE": 5000},
},
{
"name": "experiment_multicolumn_index_25000",
"compose_files": ("compose.yml", "edge-id.yml", "index-multi.yml"), "compose_files": ("compose.yml", "edge-id.yml", "index-multi.yml"),
"env": {"LINES_PER_SECOND": 25000, "BATCH_SIZE": 5000}, "env": {"BATCH_SIZE": 5000},
}, },
{ {
"name": "experiment_triggers_10000", "name": "experiment_triggers",
"compose_files": ("compose.yml", "edge-id-triggers.yml", "index.yml"), "compose_files": ("compose.yml", "edge-id-triggers.yml", "index.yml"),
"env": {"LINES_PER_SECOND": 10000, "BATCH_SIZE": 10000}, "env": {"BATCH_SIZE": 5000},
},
{
"name": "experiment_triggers_20000",
"compose_files": ("compose.yml", "edge-id-triggers.yml", "index.yml"),
"env": {"LINES_PER_SECOND": 20000, "BATCH_SIZE": 20000},
},
{
"name": "experiment_triggers_30000",
"compose_files": ("compose.yml", "edge-id-triggers.yml", "index.yml"),
"env": {"LINES_PER_SECOND": 30000, "BATCH_SIZE": 30000},
},
{
"name": "experiment_uuid_10000",
"compose_files": ("compose.yml", "edge-uuid.yml", "index.yml"),
"env": {"LINES_PER_SECOND": 10000, "BATCH_SIZE": 10000},
}, },
{ {
"name": "experiment_uuid_20000", "name": "experiment_uuid",
"compose_files": ("compose.yml", "edge-uuid.yml", "index.yml"), "compose_files": ("compose.yml", "edge-uuid.yml", "index.yml"),
"env": {"LINES_PER_SECOND": 20000, "BATCH_SIZE": 20000}, "env": {"BATCH_SIZE": 5000},
}, },
) )
......
log_dest file /mosquitto/log/mosquitto.log
allow_anonymous true
listener 1883 0.0.0.0
max_queued_messages 1000000000
\ No newline at end of file
File moved
...@@ -55,7 +55,7 @@ if __name__ == "__main__": ...@@ -55,7 +55,7 @@ if __name__ == "__main__":
csv.writer(file).writerow(("TIMESTAMP", "VERTEX_COUNT")) csv.writer(file).writerow(("TIMESTAMP", "VERTEX_COUNT"))
info("Starting...") info("Starting...")
end_time = timedelta(hours=1) end_time = timedelta(seconds=20)
schedule.every(5).seconds.until(end_time).do(perf_job, csv_path) schedule.every(5).seconds.until(end_time).do(perf_job, csv_path)
while (sleep_time := schedule.idle_seconds()) is not None and not interrupted: while (sleep_time := schedule.idle_seconds()) is not None and not interrupted:
sleep(max(sleep_time, 0)) sleep(max(sleep_time, 0))
......
File moved
FROM python:3
WORKDIR /app
RUN pip install paho-mqtt schedule
COPY pub_cdm.py ./
# ENV MOS_HOST= MOS_TOPIC= LINES_PER_SECOND= BATCH_SIZE=
CMD ["python", "-u", "pub_cdm.py"]
\ No newline at end of file
from signal import SIGINT, SIGTERM, signal
from typing import Iterator, List, Iterable, Sequence, IO
from paho.mqtt.client import Client, CallbackAPIVersion, ReasonCode
from zipfile import ZipFile, ZIP_LZMA
from os import environ
from time import sleep
from logging import basicConfig, INFO, DEBUG, debug, info, critical
import schedule
def batch_lines(lines: Sequence[str], batch_size: int) -> Sequence[str]:
return [
f"[{','.join(lines[pos : pos + batch_size])}]"
for pos in range(0, len(lines), batch_size)
]
def zipped_files(path: str) -> Iterable[IO[bytes]]:
with ZipFile(path, "r", ZIP_LZMA) as zip_file:
for name in zip_file.namelist():
with zip_file.open(name) as file:
yield file
def read_zip(path: str, line_limit: int) -> Iterable[Sequence[str]]:
lines: List[str] = []
for file in zipped_files(path):
while (line := next(file, None)) is not None:
lines.append(line.decode("utf-8"))
if len(lines) >= line_limit:
yield lines
lines.clear()
if len(lines) > 0:
yield lines
def on_connect(_1, _2, _3, reason_code: ReasonCode, _5):
if not reason_code.is_failure:
info("Connected to broker.")
else:
critical(f"Connection to broker failed: {reason_code.getName()}")
def publish_lines(client: Client, topic: str, zip: Iterator[Sequence[str]]):
batches = next(zip)
for batch in batches:
client.publish(topic, batch, 2)
interrupted = False
def exit_gracefully(signum, frame):
global interrupted
interrupted = True
if __name__ == "__main__":
signal(SIGTERM, exit_gracefully)
signal(SIGINT, exit_gracefully)
basicConfig(level=INFO)
broker_conf: dict = {"host": environ["MOS_HOST"]}
topic = environ["MOS_TOPIC"]
num_lines = int(environ["LINES_PER_SECOND"])
batch_size = max(min(num_lines, int(environ["BATCH_SIZE"])), 1)
file = "/data.zip"
try:
client = Client(CallbackAPIVersion.VERSION2, "Publisher")
client.connect(**broker_conf)
client.loop_start()
batch = (batch_lines(lines, batch_size) for lines in read_zip(file, num_lines))
schedule.every().second.do(publish_lines, client, topic, batch)
while (sleep_time := schedule.idle_seconds()) is not None and not interrupted:
sleep(max(sleep_time, 0))
try:
schedule.run_pending()
except StopIteration:
schedule.clear()
info("done")
finally:
try:
client.disconnect()
except NameError:
pass
FROM python:3
WORKDIR /app
RUN pip install paho-mqtt "psycopg[binary,pool]"
COPY sub_pg_cdm.py ./
# ENV PGHOST= PGDATABASE= PGUSER= PGPASSWORD= MOS_HOST= MOS_TOPIC=
CMD ["python", "-u", "sub_pg_cdm.py"]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment