Skip to content
Snippets Groups Projects
Commit 751bb601 authored by julian's avatar julian
Browse files

implemented manual insert for id edges,

increased default_statistics_target
parent 6886f1c3
No related branches found
No related tags found
No related merge requests found
...@@ -3,6 +3,10 @@ PGUSER=postgres ...@@ -3,6 +3,10 @@ PGUSER=postgres
PGDATABASE=postgres PGDATABASE=postgres
PGHOST=postgres PGHOST=postgres
# sub
EDGE_INSERTS=EDGE_INSERTS
UUID_EDGES=
# pub-sub # pub-sub
MOS_HOST=mosquitto MOS_HOST=mosquitto
MOS_TOPIC=ta1-cadets-e3-official MOS_TOPIC=ta1-cadets-e3-official
......
...@@ -22,6 +22,8 @@ services: ...@@ -22,6 +22,8 @@ services:
- PGHOST - PGHOST
- PGDATABASE - PGDATABASE
- PGUSER - PGUSER
- EDGE_INSERTS
- UUID_EDGES
- PGPASSFILE=/run/secrets/pgpass - PGPASSFILE=/run/secrets/pgpass
profiles: profiles:
- experiment - experiment
...@@ -73,8 +75,10 @@ services: ...@@ -73,8 +75,10 @@ services:
start_period: 5s start_period: 5s
start_interval: 1s start_interval: 1s
volumes: volumes:
- ./postgres/initdb/00-initdb.sql:/docker-entrypoint-initdb.d/00-initdb.sql:ro
- ./postgres/initdb/initdb.py:/docker-entrypoint-initdb.d/initdb.py:ro - ./postgres/initdb/initdb.py:/docker-entrypoint-initdb.d/initdb.py:ro
- ./postgres/initdb/00-initdb.sql:/docker-entrypoint-initdb.d/00-initdb.sql:ro
- ./postgres/initdb/02-initdb_edge_id.sql:/docker-entrypoint-initdb.d/02-initdb.sql:ro
- ./postgres/initdb/04-initdb_edge_index.sql:/docker-entrypoint-initdb.d/04-initdb.sql:ro
configs: configs:
- postgres_conf - postgres_conf
- data.zip - data.zip
......
...@@ -429,7 +429,7 @@ min_wal_size = 80MB ...@@ -429,7 +429,7 @@ min_wal_size = 80MB
# - Other Planner Options - # - Other Planner Options -
#default_statistics_target = 100 # range 1-10000 default_statistics_target = 300 # range 1-10000
#constraint_exclusion = partition # on, off, or partition #constraint_exclusion = partition # on, off, or partition
#cursor_tuple_fraction = 0.1 # range 0.0-1.0 #cursor_tuple_fraction = 0.1 # range 0.0-1.0
#from_collapse_limit = 8 #from_collapse_limit = 8
......
from signal import SIGINT, SIGTERM, signal from signal import SIGINT, SIGTERM, signal
import sys
from paho.mqtt.client import Client, CallbackAPIVersion, MQTTMessage, ReasonCode from paho.mqtt.client import Client, CallbackAPIVersion, MQTTMessage, ReasonCode
from typing import Tuple, Any, Iterable, Sequence, TypeAlias, Final from typing import Tuple, Any, Iterable, Sequence, TypeAlias, Final
from os import environ from os import environ, getenv
from logging import basicConfig, INFO, DEBUG, info, debug, critical from logging import basicConfig, INFO, DEBUG, info, debug, critical
import sys
import json import json
import psycopg import psycopg
Parameter_Query: TypeAlias = Tuple[str, Iterable[Sequence[str]]] Parameter_Query: TypeAlias = Tuple[str, Iterable[Sequence[Any]]]
HEADER_SCHEMA: Final[bytes] = b"com.bbn.tc.schema.avro.cdm18." HEADER_SCHEMA: Final[bytes] = b"com.bbn.tc.schema.avro.cdm18."
NODE_TYPES: Final[dict] = {
# python trap: significant trailing commas
"Event": (
("is_generated_by", ("subject", "UUID")),
("affects", ("predicateObject", "UUID")),
("affects_2", ("predicateObject2", "UUID")),
),
"Subject": (
("has_parent", ("parentSubject", "UUID")),
("has_owning_principal", ("localPrincipal",)),
),
"FileObject": (("has_owning_principal", ("localPrincipal",)),),
"UnnamedPipeObject": (
("ingests", ("sourceUUID", "UUID")),
("outputs", ("sinkUUID", "UUID")),
),
"NetFlowObject": (),
"SrcSinkObject": (),
"Principal": (),
"Host": (),
# python trap: significant trailing commas
}
def first_key(obj: dict) -> str: def first_key(obj: dict) -> str:
return next(iter(obj)) return next(iter(obj))
def unpack_node(node_entry: dict) -> Tuple[str, dict]: def unpack_node(node_entry: dict) -> Tuple[str, str, dict]:
node = node_entry["datum"] node = node_entry["datum"]
node_type = first_key(node) node_type = first_key(node)
return (node_type, node[node_type]) node = node[node_type]
uuid = node["uuid"]
return (uuid, node_type, node)
def unpack_to_queries(node_list: Sequence[dict]) -> Parameter_Query: def format_vertex_query(node_list: Sequence[Tuple[str, str, dict]]) -> Parameter_Query:
nodes = [unpack_node(entry) for entry in node_list] node_json = ((type, json.dumps(node)) for _, type, node in node_list)
node_json = ((node_type, json.dumps(node)) for node_type, node in nodes)
return ("INSERT INTO vertex (type, content) VALUES(%s, %s)", node_json) return ("INSERT INTO vertex (type, content) VALUES(%s, %s)", node_json)
def try_keys(obj: dict, keys: Sequence[str]) -> str | None:
val: Any = obj
for key in keys:
val = val.get(key)
if val is None:
break
return val
def get_edges(uuid: str, type: str, content: dict) -> Iterable[Tuple[str, str, str]]:
for edge_type, edge_keys in NODE_TYPES[type]:
if (dest := try_keys(content, edge_keys)) is not None:
yield (uuid, dest, edge_type)
def format_edge_uuid(nodes: Sequence[Tuple[str, str, dict]]) -> Parameter_Query:
edges = (edge for node in nodes for edge in get_edges(*node))
return ("INSERT INTO edge (source, destination, type) VALUES (%s, %s, %s)", edges)
def format_edge_id(nodes: Sequence[Tuple[str, str, dict]]) -> Tuple[str, Sequence[str]]:
edges = [edge for node in nodes for edge in get_edges(*node)]
query_str = (
f"INSERT INTO edge (source, destination, type) "
f"SELECT vsrc.id, vdst.id, tp::EDGE_TYPE "
f"FROM (VALUES {",".join(["(%s,%s,%s)"]*len(edges))}) AS new_edges (src,dst,tp) "
f"JOIN vertex vsrc ON (vsrc.content->>'uuid')=src "
f"JOIN vertex vdst ON (vdst.content->>'uuid')=dst "
)
return (query_str, [val for edge in edges for val in edge])
class Postgres_inserter: class Postgres_inserter:
def __init__(self, db_conf={}, client_conf={}, topic="", broker_conf={}): def __init__(self, db_conf={}, client_conf={}, topic="", broker_conf={}):
self.broker_conf = broker_conf self.broker_conf = broker_conf
...@@ -45,15 +99,17 @@ class Postgres_inserter: ...@@ -45,15 +99,17 @@ class Postgres_inserter:
self.client.connect(**self.broker_conf) self.client.connect(**self.broker_conf)
self.client.loop_forever() self.client.loop_forever()
def execute_queries(self, query: Parameter_Query):
with self.db_conn.cursor() as cursor:
cursor.executemany(*query)
self.db_conn.commit()
def on_message(self, client: Client, userdata: Any, message: MQTTMessage): def on_message(self, client: Client, userdata: Any, message: MQTTMessage):
json_list = json.loads(message.payload.replace(HEADER_SCHEMA, b"")) json_list = json.loads(message.payload.replace(HEADER_SCHEMA, b""))
queries = unpack_to_queries(json_list) nodes = [unpack_node(entry) for entry in json_list]
self.execute_queries(queries) with self.db_conn.cursor() as cursor:
cursor.executemany(*format_vertex_query(nodes))
if len(getenv("EDGE_INSERTS", "")) > 0:
if len(getenv("UUID_EDGES", "")) > 0:
cursor.executemany(*format_edge_uuid(nodes))
else:
cursor.execute(*format_edge_id(nodes))
self.db_conn.commit()
def on_connect(self, client: Client, _2, _3, reason_code: ReasonCode, _4): def on_connect(self, client: Client, _2, _3, reason_code: ReasonCode, _4):
if not reason_code.is_failure: if not reason_code.is_failure:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment