Skip to content
Snippets Groups Projects
Commit 8eeea967 authored by julian's avatar julian
Browse files

changed initdb to load entire cadets dataset with python script

parent fb773e21
No related branches found
No related tags found
No related merge requests found
......@@ -57,7 +57,7 @@ services:
target: /mosquitto/config/mosquitto.conf
postgres:
image: postgres
build: postgres
command: -c config_file=/postgres_conf
shm_size: 20gb
hostname: $PGHOST
......@@ -71,7 +71,8 @@ services:
start_period: 5s
start_interval: 1s
volumes:
- ./postgres:/docker-entrypoint-initdb.d:ro
- ./postgres/initdb:/docker-entrypoint-initdb.d
- ./streaming/pub/data:/docker-entrypoint-initdb.d/data:ro
configs:
- postgres_conf
secrets:
......
FROM postgres:latest
RUN apt-get -y update
RUN apt-get -y install python3
\ No newline at end of file
import sys
import json
from typing import IO, Final, Iterable, Tuple
from zipfile import ZIP_LZMA, ZipFile
HEADER_SCHEMA: Final[bytes] = b"com.bbn.tc.schema.avro.cdm18."
def first_key(obj: dict) -> str:
return next(iter(obj))
def unpack_node(node_entry: dict) -> Tuple[str, dict]:
node = node_entry["datum"]
node_type = first_key(node)
return (node_type, node[node_type])
def zipped_files(path: str) -> Iterable[IO[bytes]]:
with ZipFile(path, "r", ZIP_LZMA) as zip_file:
for name in zip_file.namelist():
print(name, file=sys.stderr)
with zip_file.open(name) as file:
yield file
if __name__ == "__main__":
path = sys.argv[1]
for file in zipped_files(path):
for line in file:
json_line = json.loads(line.replace(HEADER_SCHEMA, b""))
node_type, node = unpack_node(json_line)
print(f"{node_type},'{json.dumps(node)}'")
BEGIN;
CREATE TYPE VERTEX_TYPE AS ENUM ('Event', 'FileObject', 'Principal', 'Subject', 'Host', 'NetFlowObject', 'SrcSinkObject', 'UnnamedPipeObject');
CREATE TABLE vertex(
id INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
......@@ -7,8 +5,6 @@ CREATE TABLE vertex(
content JSONB NOT NULL
);
CREATE INDEX vertex_uuid_id ON vertex ((content->>'uuid')) INCLUDE (id);
CREATE TYPE EDGE_TYPE AS ENUM ('is_generated_by', 'affects', 'affects_2', 'has_parent', 'has_owning_principal', 'ingests', 'outputs');
CREATE TABLE edge(
source INTEGER NOT NULL,
......@@ -69,29 +65,11 @@ create trigger process_vertex_insertions
referencing new table as new_vertices
for each statement execute function process_new_vertices();
COMMIT;
BEGIN;
CREATE TEMPORARY TABLE vertex_temp(
data jsonb
);
COPY vertex_temp
FROM PROGRAM 'awk -F "\07" ''{gsub("com\.bbn\.tc\.schema\.avro\.cdm18\.", ""); print "\047"$0"\047"}'' < /docker-entrypoint-initdb.d/data/ta1-cadets-e3-official_0.json'
WITH (FORMAT csv, DELIMITER e'\07', QUOTE '''');
WITH vertices AS(
SELECT jsonb_object_keys(data->'datum') vert_type, data->'datum' vert
FROM vertex_temp
)
INSERT INTO vertex (type, content)
SELECT vert_type::VERTEX_TYPE, vert->vert_type
FROM vertices;
-- COPY vertex (type, content)
-- FROM PROGRAM 'python3 /docker-entrypoint-initdb.d/initdb.py /docker-entrypoint-initdb.d/data/ta1-cadets-e3-official.zip'
-- WITH (FORMAT csv, QUOTE '''');
COMMIT;
BEGIN;
CREATE INDEX vertex_uuid_id ON vertex ((content->>'uuid')) INCLUDE (id);
CREATE INDEX edge_source ON edge (source) include (destination);
CREATE INDEX edge_dest ON edge (destination) include (source);
COMMIT;
......@@ -244,7 +244,7 @@ checkpoint_timeout = 15min # range 30s-1d
#checkpoint_completion_target = 0.9 # checkpoint target duration, 0.0 - 1.0
#checkpoint_flush_after = 256kB # measured in pages, 0 disables
#checkpoint_warning = 30s # 0 disables
max_wal_size = 10GB
max_wal_size = 50GB
min_wal_size = 80MB
# - Prefetching during recovery -
......
......@@ -92,7 +92,7 @@ PREPARE two_hop_new (INTEGER) AS
SELECT source, destination FROM edge
UNION ALL
SELECT destination, source FROM edge
), hop1 AS NOT MATERIALIZED (
), hop1 AS (
SELECT destination AS vertex
FROM double_edge
WHERE source = $1
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment