Skip to content
Snippets Groups Projects
Commit d23bd97a authored by julian's avatar julian
Browse files

added configuration for db connection to pgadmin on creation and disabled...

added configuration for db connection to pgadmin on creation and disabled password for pgadmin (unfinished), added initdb script, renamed node to vertex, experimented with adjacency list in vertex
parent d4f17e06
No related branches found
No related tags found
No related merge requests found
......@@ -2,6 +2,7 @@
PGUSER=postgres
PGDATABASE=postgres
PGHOST=postgres
PGPASSWORD=postgres
# pub-sub
MOS_HOST=mosquitto
......@@ -15,7 +16,11 @@ BATCH_SIZE=500
QUERY_INTERVAL=1
# pgadmin
PGADMIN_CONFIG_SERVER_MODE=False
PGADMIN_CONFIG_MASTER_PASSWORD_REQUIRED=False
PGADMIN_DEFAULT_EMAIL=admin@abc.de
PGADMIN_DEFAULT_PASSWORD=admin
PGADMIN_DISABLE_POSTFIX=True
# grafana
DS_PROMETHEUS=prometheus_src
......@@ -2,4 +2,5 @@
__pycache__
.mypy_cache
/streaming/pub/data/
/postgres/data/
/output
\ No newline at end of file
......@@ -6,6 +6,8 @@ services:
- MOS_TOPIC
- LINES_PER_SECOND
- BATCH_SIZE
profiles:
- experiment
depends_on:
- mosquitto
- sub_pg
......@@ -19,6 +21,8 @@ services:
- PGDATABASE
- PGUSER
- PGPASSFILE=/run/secrets/postgres_pass
profiles:
- experiment
secrets:
- postgres_pass
depends_on:
......@@ -36,8 +40,8 @@ services:
- PGUSER
- PGPASSFILE=/run/secrets/postgres_pass
- QUERY_INTERVAL
volumes:
- query-logs:/app/log
profiles:
- experiment
secrets:
- postgres_pass
depends_on:
......@@ -46,6 +50,8 @@ services:
mosquitto:
image: eclipse-mosquitto
hostname: $MOS_HOST
profiles:
- experiment
configs:
- source: mosquitto_conf
target: /mosquitto/config/mosquitto.conf
......@@ -59,34 +65,42 @@ services:
- PGUSER
- POSTGRES_USER=$PGUSER
- POSTGRES_DB=$PGDATABASE
- POSTGRES_PASSWORD_FILE=/run/secrets/postgres_pass
- POSTGRES_PASSWORD_FILE=/run/secrets/postgres_db_pass
healthcheck:
test: pg_isready -U postgres
start_period: 5s
start_interval: 1s
volumes:
- postgres-data:/var/lib/postgresql/data
- postgres-logs:/var/log/postgresql
- ./postgres:/docker-entrypoint-initdb.d:ro
configs:
- postgres_conf
secrets:
- postgres_pass
- postgres_db_pass
pgadmin:
image: dpage/pgadmin4
environment:
- PGADMIN_CONFIG_SERVER_MODE
- PGADMIN_CONFIG_MASTER_PASSWORD_REQUIRED
- PGADMIN_DEFAULT_EMAIL
- PGADMIN_DEFAULT_PASSWORD_FILE=/run/secrets/pgadmin_pass
- PGADMIN_DEFAULT_PASSWORD
- PGADMIN_DISABLE_POSTFIX
profiles:
- experiment
- inspect
ports:
- 80:80
volumes:
- postgres-admin:/var/lib/pgadmin
configs:
- source: pgadmin_server_conf
target: /pgadmin4/servers.json
secrets:
- pgadmin_pass
- postgres_pass
depends_on:
postgres:
condition: service_healthy
restart: true
grafana:
image: grafana/grafana-oss
......@@ -137,12 +151,6 @@ services:
depends_on:
- postgres
volumes: #TODO remove unnecessary volumes (probably everything except postgres-data)
postgres-data:
postgres-logs:
postgres-admin:
query-logs:
configs:
prometheus_conf:
file: ./prometheus.yml
......@@ -156,9 +164,11 @@ configs:
file: ./grafana/dashboard.yaml
grafana_dash_src:
file: ./grafana/9628_rev7.json
pgadmin_server_conf:
file: ./servers.json
secrets:
pgadmin_pass:
file: ./pgadminpass.txt
postgres_pass:
file: ./.pgpass
postgres_db_pass:
file: ./postgres_pass.txt
admin
\ No newline at end of file
BEGIN;
CREATE TYPE VERTEX_TYPE AS ENUM ('Event', 'FileObject', 'Principal', 'Subject', 'Host', 'NetFlowObject', 'SrcSinkObject', 'UnnamedPipeObject');
CREATE UNLOGGED TABLE vertex(
id INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
type VERTEX_TYPE NOT NULL,
content JSONB NOT NULL,
ancestors INTEGER[] NOT NULL DEFAULT ARRAY[]::integer[],
descendants INTEGER[] NOT NULL DEFAULT ARRAY[]::integer[]
);
CREATE INDEX vertex_uuid_id ON vertex ((content->>'uuid')) INCLUDE (id);
CREATE TYPE EDGE_TYPE AS ENUM ('is_generated_by', 'affects', 'affects_2', 'has_parent', 'has_local_principal', 'runs_on', 'resides_on', 'has_owning_principal', 'has_account_on');
CREATE UNLOGGED TABLE edge(
source INTEGER NOT NULL,
destination INTEGER NOT NULL,
type EDGE_TYPE NOT NULL
);
create function process_new_vertices() returns trigger as $proc_verts$
begin
with raw_edges as(
select id source, content#>>'{subject,UUID}' destination_uuid, 'is_generated_by' type
from new_vertices
where type='Event'
UNION ALL
select id, content#>>'{predicateObject,UUID}', 'affects'
from new_vertices
where type='Event'
UNION ALL
select id, content#>>'{predicateObject2,UUID}', 'affects_2'
from new_vertices
where type='Event'
UNION ALL
select id, content#>>'{parentSubject,UUID}', 'has_parent'
from new_vertices
where type='Subject'
UNION ALL
select id, content->>'localPrincipal', 'has_local_principal'
from new_vertices
where type='Subject'
UNION ALL
select id, content->>'hostId', 'runs_on'
from new_vertices
where type='Subject'
UNION ALL
select id, content->>'localPrincipal', 'has_owning_principal'
from new_vertices
where type='FileObject'
UNION ALL
select id, content#>>'{baseObject,hostId}', 'resides_on'
from new_vertices
where type='FileObject' or type='UnnamedPipeObject' or type='NetFlowObject' or type='SrcSinkObject'
UNION ALL
select id, content#>>'{sourceUUID,UUID}', 'affects'
from new_vertices
where type='UnnamedPipeObject'
UNION ALL
select id, content#>>'{sinkUUID,UUID}', 'affects_2'
from new_vertices
where type='UnnamedPipeObject'
UNION ALL
select id, content->>'hostId', 'has_account_on'
from new_vertices
where type='Principal'
), new_edges as(
select source, id destination, e.type::EDGE_TYPE
from raw_edges e
join vertex on destination_uuid=(content->>'uuid')
where destination_uuid is not null
), new_ancestors as(
select source, array_agg(destination) ancs
from new_edges
group by source
), new_descendants as(
select destination, array_agg(source) descs
from new_edges
group by destination
), update_ancs as(
update vertex
set ancestors = ancs
from new_ancestors
where id=source
), update_descs as(
update vertex
set descendants = descs
from new_descendants
where id=destination
)
insert into edge (source, destination, type)
select * from new_edges;
return null;
end;
$proc_verts$ language plpgsql;
create trigger process_vertex_insertions
after insert on vertex
referencing new table as new_vertices
for each statement execute function process_new_vertices();
COMMIT;
BEGIN;
CREATE TEMPORARY TABLE vertex_temp(
data jsonb
);
COPY vertex_temp
FROM PROGRAM 'awk -F "\07" ''{gsub("com\.bbn\.tc\.schema\.avro\.cdm18\.", ""); print "\047"$0"\047"}'' < /docker-entrypoint-initdb.d/data/ta1-cadets-e3-official_0.json'
WITH (FORMAT csv, DELIMITER e'\07', QUOTE '''');
WITH vertices AS(
SELECT jsonb_object_keys(data->'datum') vert_type, data->'datum' vert
FROM vertex_temp
)
INSERT INTO vertex (type, content)
SELECT vert_type::VERTEX_TYPE, vert->vert_type
FROM vertices;
COMMIT;
BEGIN;
CREATE INDEX edge_source ON edge (source);
CREATE INDEX edge_dest ON edge (destination);
COMMIT;
BEGIN;
ALTER TABLE vertex SET LOGGED;
ALTER TABLE edge SET LOGGED;
COMMIT;
VACUUM ANALYZE;
postgres
\ No newline at end of file
from typing import Sequence, Tuple, Any, TypeAlias, Deque
from time import strftime, perf_counter, sleep
from os import environ, path, makedirs
from logging import info, basicConfig, INFO, StreamHandler, FileHandler
from logging import info, basicConfig, INFO
from psycopg import Cursor, IsolationLevel, connect, ClientCursor
from datetime import timedelta
from collections import deque
......@@ -65,7 +65,6 @@ if __name__ == "__main__":
interval = int(environ["QUERY_INTERVAL"])
log_directory = "log"
log_path = f"{log_directory}/{strftime('%Y-%m-%d_%H-%M-%S')}_query_log"
log_name = f"{log_path}.log"
csv_name = f"{log_path}.csv"
queries = (
("count_nodes", None),
......@@ -79,7 +78,8 @@ if __name__ == "__main__":
# 1,
# ),
# ),
# ("two_hop(%s)", ("9FF334BB-9072-D756-B290-556656D73728",)),
# ("two_hop_edge(%s)", ("9FF334BB-9072-D756-B290-556656D73728",)),
# ("two_hop_adjacency(%s)", ("9FF334BB-9072-D756-B290-556656D73728",)),
)
csv_header = (
"Query Key",
......@@ -94,11 +94,7 @@ if __name__ == "__main__":
with open(csv_name, "w") as file:
csv.writer(file).writerow(csv_header)
basicConfig(
level=INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=(StreamHandler(), FileHandler(log_name, "w")),
)
basicConfig(level=INFO, format="%(asctime)s - %(levelname)s - %(message)s")
info("Starting...")
end_time = timedelta(hours=2)
......
......@@ -64,12 +64,12 @@ PREPARE shortest_path (UUID, UUID, int) AS
)
SELECT path FROM shortest_paths;
PREPARE two_hop (UUID) AS
PREPARE two_hop_edge (INTEGER) AS
WITH hop1 AS (
SELECT DISTINCT e.source, e.destination
FROM edge e
WHERE e.source = $1
OR e.destination = $1
SELECT DISTINCT source, destination
FROM edge
WHERE source = $1
OR destination = $1
), combined AS (
SELECT source AS node FROM hop1
UNION
......@@ -87,27 +87,20 @@ PREPARE two_hop (UUID) AS
)
WHERE node <> $1;
PREPARE two_hop_new (UUID) AS
WITH hop1 AS (
SELECT source AS uuid
FROM edge
WHERE destination = $1
UNION
SELECT destination
FROM edge
WHERE source = $1
)
SELECT DISTINCT uuid
FROM (
SELECT uuid
FROM hop1
UNION
SELECT source
FROM edge e
JOIN hop1 h ON e.destination = h.uuid
UNION
SELECT destination
FROM edge e
JOIN hop1 h ON e.source = h.uuid
PREPARE two_hop_adjacency (INTEGER) AS
with hop1 as(
select unnest(ancestors||descendants) neighbor
from vertex
where id = 113
), hop2 as(
select unnest(ancestors||descendants) neighbor
from vertex
join hop1 on id=hop1.neighbor
)
WHERE uuid <> $1
\ No newline at end of file
select distinct neighbor
from hop2
where neighbor<>113
union all
select neighbor
from hop1;
\ No newline at end of file
{
"Servers": {
"1": {
"Name": "cadets",
"Group": "Servers",
"Port": 5432,
"Username": "postgres",
"PassFile": "/run/secrets/postgres_pass",
"Host": "postgres",
"SSLMode": "prefer",
"MaintenanceDB": "postgres"
}
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment