Skip to content
Snippets Groups Projects
Commit 78ca6193 authored by Sven-Ove Hänsel's avatar Sven-Ove Hänsel
Browse files

added something

parent dff0775a
No related branches found
No related tags found
No related merge requests found
# Infrastruktur zum Ausführen von Experimenten
## Start der Umgebungen
Zum Start der Infrastruktur das docker-compse.yml im Ordner "infrastructure" ausführen.
Dies startet alle Container (DB, Aufzeichnen Metriken, Publisher/Subscriber/Broker, ...) die benötigt werden.
Die Startreihenfolge der Container ist durch conditions definiert. Die Subscriber der DB warten bis der jeweilige DB Container healthy ist.
## Experiment zum Ziel "Hoher Schreibdurchsatz" Ziel A
Die Subscriber Container werden durch das Docker Compose gestartet. In Prometheus werden Metriken erfasst. Grafana wird genutzt um Metriken anzuzeigen.
In Frage kommende Metriken:
Container CPU (cadvisor)
Container RAM (cadvisor)
Container Network Traffic (cadvisor)
## Start des ersten Experiments
Startzeit am 22.01.2024 um 13:30h
Logauszüge für Fehlerentdeckung:
2024-01-22 14:56:48 2024-01-22 13:56:48.199 UTC [47] ERROR: unexpected data beyond EOF in block 3827 of relation base/5/69776
2024-01-22 14:56:48 2024-01-22 13:56:48.199 UTC [47] HINT: This has been seen to occur with buggy kernels; consider updating your system.
2024-01-22 14:56:48 2024-01-22 13:56:48.199 UTC [47] STATEMENT: INSERT INTO event (line, uuid,sequence_long,type,threadId_int,hostId,subject_UUID,predicateObject_UUID,timestampNanos,name_string,size_long,properties_map_host,properties_map_fd,properties_map_exec,properties_map_ppid) VALUES (nextval('line_number_seq'),'CDE303FC-EC03-5545-923D-566D92E04F13','383167','EVENT_LSEEK','100218','83C8ED1F-5045-DBCD-B39F-918F0DF4F851','9C003550-36E1-11E8-BF66-D9AA8AFF4A69','0C51AAE3-0594-4A51-9405-4FC4214A0041','1522720324963051371','aue_lseek','81963296','83c8ed1f-5045-dbcd-b39f-918f0df4f851','4','lsof','2549')
2024-01-22 14:56:55 2024-01-22 13:56:55.568 UTC [533] FATAL: role "root" does not exist
......@@ -17,7 +17,9 @@ services:
container_name: prometheus
ports:
- 127.0.0.1:9090:9090
command: "--config.file=/etc/prometheus/prometheus.yml"
command:
- "--config.file=/etc/prometheus/prometheus.yml"
- '--web.enable-admin-api'
volumes:
- ./monitoring/prometheus/config/prometheus.yml:/etc/prometheus/prometheus.yml
- ./monitoring/prometheus/data:/prometheus
......@@ -73,15 +75,23 @@ services:
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
# Datenbank Container - Postgres
postgres:
image: postgres:latest
container_name: postgres
environment:
- PGUSER=postgres
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
healthcheck:
test: ["CMD-SHELL", "pg_isready","-U postgres"]
interval: 30s
timeout: 10s
retries: 5
start_period: 10s
logging:
options:
max-size: 10m
......@@ -90,6 +100,7 @@ services:
- 5432:5432
volumes:
- ./sql/docker/data:/var/lib/postgresql/data
- ./sql/docker/postgres_logs:/var/log/postgresql
# copy the sql script to create tables
- ./sql/skript/import_cadets:/opt/import_cadets
......@@ -117,6 +128,18 @@ services:
- ./streaming/mosquitto.conf:/mosquitto/config/mosquitto.conf
- ./streaming/log/mosquitto.log:/mosquitto/log/mosquitto.log
sub_pg:
container_name: sub_pg
image: sub_pg
volumes:
- ./streaming/clients/sub/postgres/:/var/lib/import/
environment:
- path_sql_script=/var/lib/import/
depends_on:
postgres:
condition: service_healthy
restart: true
sub_neo4j:
container_name: sub_neo4j
image: sub_neo4j
......@@ -150,6 +173,7 @@ services:
restart: unless-stopped
depends_on:
- sub_neo4j
- sub_pg
# condition: service_healthy
# restart: true
......
# Befehl zum Löschen der Daten einer TS
## Beispiel für Postgres
curl -u admin -X POST -g 'http://localhost:9090/api/v1/admin/tsdb/delete_series?match[]={job="postgres"}'
curl -u admin -X POST -g 'http://localhost:9090/api/v1/admin/tsdb/delete_series?match[]={job="cadvisor"}'
curl -u admin -X POST -g 'http://localhost:9090/api/v1/admin/tsdb/delete_series?match[]={job="neo4j"}'
\ No newline at end of file
No preview for this file type
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
......@@ -190,6 +190,8 @@ def connect_to_db(uri,auth):
'''
driver = GraphDatabase.driver(uri, auth=auth)
with driver.session() as session:
print("Cleanup existing data...")
session.run("MATCH (n) detach delete n")
session.run("RETURN 1 as result")
print("Successfully connected to DB...")
return driver
......
# Use an official Python runtime as a base image
FROM python:3
# Set working directory in the container
WORKDIR /app
# install dependencies
RUN pip install paho-mqtt
RUN pip install psycopg2
# Copy the Python script into the container
COPY sub_pg_cdm.py /app/
# Set environment variable 'time_limit'
ENV abort_time_limit=999999
ENV pg_host='postgres'
ENV pg_port=5432
ENV pg_user=postgres
ENV pg_pw=postgres
ENV pg_db=postgres
ENV mos_host=mos1
ENV mos_port=1883
ENV path_sql_script='/var/lib/import'
# Run the Python script
CMD ["python", "sub_pg_cdm.py"]
......@@ -40,6 +40,12 @@ CREATE TABLE event(
parameters_array_1_isNull VARCHAR(1024),
parameters_array_1_name_string VARCHAR(1024),
parameters_array_1_valueBytes_bytes VARCHAR(1024),
parameters_array_2_size VARCHAR(1024),
parameters_array_2_type VARCHAR(1024),
parameters_array_2_valueDataType VARCHAR(1024),
parameters_array_2_isNull VARCHAR(1024),
parameters_array_2_name_string VARCHAR(1024),
parameters_array_2_valueBytes_bytes VARCHAR(1024),
properties_map_host VARCHAR(1024),
properties_map_return_value VARCHAR(1024),
properties_map_fd VARCHAR(1024),
......
......@@ -11,6 +11,7 @@ pg_pw = str(os.getenv('postgres',default='postgres'))
pg_db = str(os.getenv('postgres',default='postgres'))
broker_hostname=str(os.getenv('mos_host',default="localhost"))
broker_port = int(os.getenv('mos_port',default=1883))
file_path = str(os.getenv('path_sql_script',default='C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\streaming\\clients\\sub\\postgres\\import_node_edge.txt'))
client = mqtt.Client("Client3")
abort_time_limit = int(os.getenv('abort_time_limit', default=99999))
......@@ -32,7 +33,34 @@ def flatten_obj(key, val, target):
else:
target[key] = val
def add_edge(key:str, keys, rel:str, dest:[], edge_type:[], data):
if key in keys:
value = data[key]
dest.append(prepare_string_value_for_sql_query(value))
edge_type.append(rel)
def prepare_string_value_for_sql_query(value:str):
return f"'{value}'"
def create_edge_insert_query(edge_values:[]):
values = []
i = 0
for edge in edge_values[1]:
s = []
s.append(f"nextval('edge_number_seq')")
s.append(edge_values[0])
s.append(edge_values[1][i])
s.append(prepare_string_value_for_sql_query(edge_values[2][i]))
i+=1
values.append(f"({','.join(s)})")
q = f"INSERT INTO edge_list (edge_no, source, dest, edge_type ) VALUES {','.join(values)} ;"
return q
def parse_json_to_sql_query(json,node_type):
print("\nparsing message: ")
ignored_keys=['CDMVersion','source']
table_name= (str.lower(node_type))
columns= []
......@@ -57,54 +85,117 @@ def parse_json_to_sql_query(json,node_type):
if "'" in str(value):
value = str(value).replace("'","''")
# handling "fence-post error"
if(first):
# handle keys, that should not be added (cdm source and cdm version)
if(short_key not in ignored_keys):
columns.append(short_key)
values.append(f"'{value}'")
first= not first
else:
# handle keys, that should not be added (cdm source and cdm version)
if(short_key not in ignored_keys):
columns.append(f",{short_key}")
values.append(f",'{value}'")
print(f'{short_key}: {value}')
values.append(prepare_string_value_for_sql_query(value))
# create queries for inserting nodes in node table
key_header = f'_datum_com.bbn.tc.schema.avro.cdm18.{node_type}'
key_postfix= f'_com.bbn.tc.schema.avro.cdm18.UUID'
uuid = json[f'_datum_com.bbn.tc.schema.avro.cdm18.{node_type}_uuid']
node_values = [prepare_string_value_for_sql_query(uuid), prepare_string_value_for_sql_query(node_type)]
# create queries for inserting edges in edge table
source = prepare_string_value_for_sql_query(uuid)
dest = []
edge_type = []
keys = json.keys()
if node_type == 'Event':
print()
rel = 'runsOn'
key = f"{key_header}_hostId"
add_edge(key, keys, rel, dest, edge_type,json)
# if node_type =='Subject':
# q = 'INSERT INTO subject'
# if node_type =='FileObject':
rel = 'isGeneratedBy'
key = f"{key_header}_subject{key_postfix}"
add_edge(key, keys, rel, dest, edge_type,json)
# if node_type == 'Host':
rel = 'predicateObject'
key = f"{key_header}_predicateObject{key_postfix}"
add_edge(key, keys, rel, dest, edge_type,json)
# if node_type == 'NetFlowObject':
rel = 'predicateObject2'
key = f"{key_header}_predicateObject2{key_postfix}"
add_edge(key, keys, rel, dest, edge_type,json)
# if node_type == 'Principal':
if node_type =='Subject':
rel = 'parentSubject'
key = f"{key_header}_parentSubject{key_postfix}"
add_edge(key, keys, rel, dest, edge_type,json)
# if node_type =='SrcSinkObject':
rel ='hasLocalPrincipal'
key = f"{key_header}_localPrincipal"
add_edge(key, keys, rel, dest, edge_type,json)
# if node_type =='UnnamedPipeObject':
rel = 'runsOn'
key = f'{key_header}_hostId'
add_edge(key, keys, rel, dest, edge_type,json)
# create queries for inserting edges in edge table
if node_type =='FileObject':
rel = 'residesOn'
key = f"{key_header}_baseObject_hostId"
add_edge(key, keys, rel, dest, edge_type,json)
q = f'INSERT INTO {table_name} ({"".join(columns)}) VALUES ({"".join(values)})'
print('Query: ', q)
print()
return q
rel = 'hasOwningPrincipal'
key = f"{key_header}_localPrincipal"
add_edge(key, keys, rel, dest, edge_type,json)
# if node_type == 'Host':
# Nothing to be done... no edges outgoing from host
if node_type == 'NetFlowObject' or node_type == 'SrcSinkObject':
rel = 'residesOn'
key = f"{key_header}_baseObject_hostId"
add_edge(key, keys, rel, dest, edge_type,json)
if node_type == 'Principal':
rel = 'hasAccountOn'
key = f"{key_header}_hostId"
add_edge(key, keys, rel, dest, edge_type,json)
if node_type =='UnnamedPipeObject':
rel = 'resides_on'
key = f"{key_header}_baseObject_hostId"
add_edge(key, keys, rel, dest, edge_type,json)
rel = 'affects1'
key = f"{key_header}_sourceUUID"
add_edge(key, keys, rel, dest, edge_type,json)
rel = 'affects2'
key = f"{key_header}_sinkUUID"
add_edge(key, keys, rel, dest, edge_type,json)
edge_values = [source,dest,edge_type]
queries = []
# node into each table
q1 = f"INSERT INTO {table_name} (line, {','.join(columns)}) VALUES (nextval('line_number_seq'),{','.join(values)})"
queries.append(q1)
# node into node list
q2 = f"INSERT INTO node_list (node_no, uuid, type) VALUES ( nextval('node_number_seq'),{','.join(node_values)})"
queries.append(q2)
# edge into edge list
print("edge_values: ",edge_values)
if len(edge_values[1]) != 0:
q3 = create_edge_insert_query(edge_values)
queries.append(q3)
else:
print("no edges")
return queries
def handle_message(m):
print('new message: ',m)
print('\nnew message: ',m)
json_type = list(m['datum'].keys())[0]
# short type string
node_type = json_type.rsplit(".", 1)[1]
print(node_type)
# print(node_type)
flat_json = {}
flatten_obj("",m,flat_json)
q = parse_json_to_sql_query(flat_json,node_type)
queries = parse_json_to_sql_query(flat_json,node_type)
for q in queries:
execute_db_query(q)
......@@ -115,10 +206,10 @@ def on_message(client, userdata, message):
'''
data = json.loads(message.payload.decode("utf-8"))
# print(f"Received message")# {data} from: ",message.topic)
try:
# try:
handle_message(data)
except Exception as e:
print(e)
# except Exception as e:
# print(e)
def on_connect(client, userdata, flags, return_code):
......@@ -143,13 +234,14 @@ def execute_db_query(q:str):
def create_pg_schema():
print('Create PG CDM Schema: ')
file_path = 'C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\streaming\\clients\\sub\\postgres\\import_node_edge.txt'
# file_path = 'C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\streaming\\clients\\sub\\postgres\\import_node_edge.txt'
path = file_path+'import_node_edge.txt'
try:
with open(file_path, 'r') as file:
with open(path, 'r') as file:
long_string = file.read()
# print(long_string)
except FileNotFoundError:
print(f"The file {file_path} was not found.")
print(f"The file {path} was not found.")
except Exception as e:
print(f"An error occurred: {str(e)}")
q = long_string
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment