Skip to content
Snippets Groups Projects
Commit c4497536 authored by julian's avatar julian
Browse files

stumbled on an improvement to two_hop, changed schema to remove inconsistent...

stumbled on an improvement to two_hop, changed schema to remove inconsistent and unnecessary edges with host, reconfigured docker profiles.
parent 435fa47e
No related branches found
No related tags found
No related merge requests found
......@@ -9,8 +9,8 @@ MOS_HOST=mosquitto
MOS_TOPIC=ta1-cadets-e3-official
# pub
LINES_PER_SECOND=1500
BATCH_SIZE=500
LINES_PER_SECOND=5000
BATCH_SIZE=2500
# query
QUERY_INTERVAL=1
......
......@@ -6,6 +6,8 @@ services:
- MOS_TOPIC
- LINES_PER_SECOND
- BATCH_SIZE
profiles:
- experiment
depends_on:
- mosquitto
- sub_pg
......@@ -19,6 +21,8 @@ services:
- PGDATABASE
- PGUSER
- PGPASSFILE=/run/secrets/postgres_pass
profiles:
- experiment
secrets:
- postgres_pass
depends_on:
......@@ -36,6 +40,8 @@ services:
- PGUSER
- PGPASSFILE=/run/secrets/postgres_pass
- QUERY_INTERVAL
profiles:
- experiment
secrets:
- postgres_pass
depends_on:
......@@ -44,6 +50,8 @@ services:
mosquitto:
image: eclipse-mosquitto
hostname: $MOS_HOST
profiles:
- experiment
configs:
- source: mosquitto_conf
target: /mosquitto/config/mosquitto.conf
......@@ -79,7 +87,6 @@ services:
- PGADMIN_DEFAULT_PASSWORD
- PGADMIN_DISABLE_POSTFIX
profiles:
- experiment
- inspect
ports:
- 80:80
......@@ -99,7 +106,7 @@ services:
environment:
- DS_PROMETHEUS
profiles:
- experiment
- measure
ports:
- 3000:3000
configs:
......@@ -114,7 +121,7 @@ services:
image: prom/prometheus
command: --config.file=/prometheus_conf --web.enable-admin-api
profiles:
- experiment
- measure
configs:
- prometheus_conf
......@@ -122,7 +129,7 @@ services:
image: gcr.io/cadvisor/cadvisor
privileged: true
profiles:
- experiment
- measure
ports:
- 8080:8080
volumes:
......@@ -139,7 +146,7 @@ services:
environment:
- DATA_SOURCE_NAME=postgresql://$PGUSER:$PGPASSWORD@$PGHOST/$PGDATABASE?sslmode=disable
profiles:
- experiment
- measure
depends_on:
- postgres
......
......@@ -4,13 +4,12 @@ CREATE TYPE VERTEX_TYPE AS ENUM ('Event', 'FileObject', 'Principal', 'Subject',
CREATE TABLE vertex(
id INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
type VERTEX_TYPE NOT NULL,
content JSONB NOT NULL,
ancestors INTEGER[] NOT NULL DEFAULT ARRAY[]::integer[],
descendants INTEGER[] NOT NULL DEFAULT ARRAY[]::integer[]
content JSONB NOT NULL
);
CREATE INDEX vertex_uuid_id ON vertex ((content->>'uuid')) INCLUDE (id);
CREATE TYPE EDGE_TYPE AS ENUM ('is_generated_by', 'affects', 'affects_2', 'has_parent', 'has_local_principal', 'runs_on', 'resides_on', 'has_owning_principal', 'has_account_on');
CREATE TYPE EDGE_TYPE AS ENUM ('is_generated_by', 'affects', 'affects_2', 'has_parent', 'has_owning_principal', 'ingests', 'outputs');
CREATE TABLE edge(
source INTEGER NOT NULL,
destination INTEGER NOT NULL,
......@@ -40,62 +39,24 @@ begin
where type='Subject'
UNION ALL
select id, content->>'localPrincipal', 'has_local_principal'
from new_vertices
where type='Subject'
UNION ALL
select id, content->>'hostId', 'runs_on'
from new_vertices
where type='Subject'
UNION ALL
select id, content->>'localPrincipal', 'has_owning_principal'
from new_vertices
where type='FileObject'
where type='Subject' or type='FileObject'
UNION ALL
select id, content#>>'{baseObject,hostId}', 'resides_on'
from new_vertices
where type='FileObject' or type='UnnamedPipeObject' or type='NetFlowObject' or type='SrcSinkObject'
UNION ALL
select id, content#>>'{sourceUUID,UUID}', 'affects'
select id, content#>>'{sourceUUID,UUID}', 'ingests'
from new_vertices
where type='UnnamedPipeObject'
UNION ALL
select id, content#>>'{sinkUUID,UUID}', 'affects_2'
select id, content#>>'{sinkUUID,UUID}', 'outputs'
from new_vertices
where type='UnnamedPipeObject'
UNION ALL
select id, content->>'hostId', 'has_account_on'
from new_vertices
where type='Principal'
), new_edges as(
select source, id destination, e.type::EDGE_TYPE
from raw_edges e
join vertex on destination_uuid=(content->>'uuid')
join vertex on (content->>'uuid')=destination_uuid
where destination_uuid is not null
), new_ancestors as(
select source, array_agg(destination) ancs
from new_edges
group by source
), new_descendants as(
select destination, array_agg(source) descs
from new_edges
group by destination
), update_ancs as(
update vertex
set ancestors = ancs
from new_ancestors
where id=source
), update_descs as(
update vertex
set descendants = (descendants || descs)
from new_descendants
where id=destination
)
insert into edge (source, destination, type)
select * from new_edges;
......@@ -109,7 +70,7 @@ create trigger process_vertex_insertions
for each statement execute function process_new_vertices();
COMMIT;
/* BEGIN;
BEGIN;
CREATE TEMPORARY TABLE vertex_temp(
data jsonb
......@@ -127,10 +88,10 @@ INSERT INTO vertex (type, content)
SELECT vert_type::VERTEX_TYPE, vert->vert_type
FROM vertices;
COMMIT; */
COMMIT;
BEGIN;
CREATE INDEX edge_source ON edge (source);
CREATE INDEX edge_dest ON edge (destination);
CREATE INDEX edge_source ON edge (source) include (destination);
CREATE INDEX edge_dest ON edge (destination) include (source);
COMMIT;
......@@ -78,8 +78,8 @@ if __name__ == "__main__":
# 1,
# ),
# ),
# ("two_hop_edge(%s)", ("9FF334BB-9072-D756-B290-556656D73728",)),
# ("two_hop_adjacency(%s)", ("9FF334BB-9072-D756-B290-556656D73728",)),
# ("two_hop_old(%s)", (113,)),
# ("two_hop_new(%s)", (113,)),
)
csv_header = (
"Query Key",
......@@ -98,7 +98,7 @@ if __name__ == "__main__":
info("Starting...")
end_time = timedelta(hours=2)
schedule.every(interval).minutes.until(end_time).do(job, queries, csv_name).run()
# schedule.every(interval).minutes.until(end_time).do(job, queries, csv_name).run()
schedule.every(5).seconds.until(end_time).do(perf_job)
while (sleep_time := schedule.idle_seconds()) is not None:
sleep(max(sleep_time, 0))
......
......@@ -64,9 +64,9 @@ PREPARE shortest_path (int, int, int) AS
)
SELECT path FROM shortest_paths;
PREPARE two_hop_edge (INTEGER) AS
PREPARE two_hop_old (INTEGER) AS
WITH hop1 AS (
SELECT DISTINCT source, destination
SELECT source, destination
FROM edge
WHERE source = $1
OR destination = $1
......@@ -87,19 +87,24 @@ PREPARE two_hop_edge (INTEGER) AS
)
WHERE vertex <> $1;
PREPARE two_hop_adjacency (INTEGER) AS
with hop1 as(
select unnest(ancestors||descendants) neighbor
from vertex
where id = $1
), hop2 as(
select unnest(ancestors||descendants) neighbor
from vertex
join hop1 on id=hop1.neighbor
)
select distinct neighbor
from hop2
where neighbor<>$1
PREPARE two_hop_new (INTEGER) AS
WITH hop1 AS (
SELECT destination vertex
FROM edge
WHERE source = $1
union all
select neighbor
from hop1;
select source
from edge
where destination = $1
), hop2 AS (
SELECT source, destination
FROM edge
JOIN hop1 ON source = vertex OR destination = vertex
)
SELECT DISTINCT vertex
FROM (
SELECT source AS vertex FROM hop2
UNION ALL
SELECT destination FROM hop2
)
WHERE vertex <> $1;
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment