Skip to content
Snippets Groups Projects
Commit 3adf6c6f authored by Sven-Ove Hänsel's avatar Sven-Ove Hänsel
Browse files

add possible solution for faster inserts

parent e9a4c7b9
No related branches found
No related tags found
No related merge requests found
%% Cell type:markdown id: tags:
# Mögliche Optimierung der Neo4j Subscriber...
%% Cell type:code id: tags:
``` python
import paho.mqtt.client as mqtt
import time
import json
from neo4j import GraphDatabase
import os
from threading import Thread
from queue import Queue
# Existing setup for MQTT and Neo4j connection
broker_hostname = str(os.getenv('mos_host', default="localhost"))
broker_port = int(os.getenv('mos_port', default=1883))
client = mqtt.Client("Client4")
db_uri = str(os.getenv('mem_host', default="bolt://localhost:7687"))
neo4j_auth = ("", "")
abort_time_limit = int(os.getenv('abort_time_limit', default=99999))
# Initialize a Queue for incoming MQTT messages
message_queue = Queue()
def flatten_obj(key, val, target):
# Your existing flatten_obj function
pass
def parse_json_to_cypher(input):
# Your existing parse_json_to_cypher function
pass
def create_cypher_query_from_cdm(json):
# Your existing create_cypher_query_from_cdm function
pass
def on_message(client, userdata, message):
data = json.loads(message.payload.decode("utf-8"))
# Instead of processing immediately, put the message into the queue
message_queue.put(data)
def on_connect(client, userdata, flags, return_code):
# Your existing on_connect function
pass
def connect_to_db(uri, auth):
# Establish db connection to neo4j
driver = GraphDatabase.driver(uri, auth=auth)
# Consider moving session cleanup and connection verification outside of this function
return driver
def execute_batch_queries(batch):
# New function to handle batch processing of messages
with driver.session() as session:
for data in batch:
q, attr = create_cypher_query_from_cdm(data)
session.run(q, attributes=attr)
def process_message_batch():
while True:
batch = []
while not message_queue.empty():
batch.append(message_queue.get())
if batch:
execute_batch_queries(batch)
for _ in batch:
message_queue.task_done()
# Start processing thread for handling MQTT messages in batches
processing_thread = Thread(target=process_message_batch, daemon=True)
processing_thread.start()
driver = connect_to_db(db_uri, neo4j_auth)
client.on_connect = on_connect
client.on_message = on_message
client.connect(broker_hostname, broker_port)
client.loop_start()
# Your MQTT client loop and cleanup logic with try-finally
try:
i = 0
while i < abort_time_limit and not client.failed_connect:
time.sleep(1)
i += 1
finally:
client.disconnect()
client.loop_stop()
driver.close()
```
%% Cell type:markdown id: tags:
CDM Data
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
import paho.mqtt.client as mqtt
import time
import json
import os
import psycopg2
pg_host = str(os.getenv('pg_host',default='localhost'))
pg_port = int(os.getenv('pg_port',default='5432'))
pg_user = str(os.getenv('postgres',default='postgres'))
pg_pw = str(os.getenv('postgres',default='postgres'))
pg_db = str(os.getenv('postgres',default='postgres'))
broker_hostname=str(os.getenv('mos_host',default="localhost"))
broker_port = int(os.getenv('mos_port',default=1883))
client = mqtt.Client("Client3")
abort_time_limit = int(os.getenv('abort_time_limit', default=99999))
def flatten_obj(key, val, target):
complexType = type(val)
if val is None:
return None
elif complexType is dict:
for k, v in val.items():
flatten_obj(key + "_" + k, v, target)
elif complexType is list:
for i in range(len(val)):
v = val[i]
flatten_obj(key + "_" + str(i), v, target)
elif complexType is object:
for k, v in val.__dict__.items():
flatten_obj(key + "_" + k, v, target)
else:
target[key] = val
def add_edge(key:str, keys, rel:str, dest:[], edge_type:[], data):
if key in keys:
value = data[key]
dest.append(prepare_string_value_for_sql_query(value))
edge_type.append(rel)
def prepare_string_value_for_sql_query(value:str):
return f"'{value}'"
def create_edge_insert_query(edge_values:[]):
values = []
i = 0
for edge in edge_values[1]:
s = []
s.append(f"nextval('edge_number_seq')")
s.append(edge_values[0])
s.append(edge_values[1][i])
s.append(prepare_string_value_for_sql_query(edge_values[2][i]))
i+=1
values.append(f"({','.join(s)})")
q = f"INSERT INTO edge_list (edge_no, source, dest, edge_type ) VALUES {','.join(values)} ;"
return q
def parse_json_to_sql_query(json,node_type):
print("\nparsing message: ")
ignored_keys=['CDMVersion','source']
table_name= (str.lower(node_type))
columns= []
values = []
first = True
# create query for inserting nodes in specific table
for key, value in json.items():
# remove header schema in json
short_key= key.replace("_com.bbn.tc.schema.avro.cdm18","")
# build substring to get column name (if key has more than 11 characters source )
if (len(short_key) < 12):
short_key = short_key[1:]
else:
short_key = short_key[2+6+len(node_type):]
# replace . with _ to match column header
short_key = short_key.replace('.','_')
# escape single quotes (') in value
if "'" in str(value):
value = str(value).replace("'","''")
if(short_key not in ignored_keys):
columns.append(short_key)
values.append(prepare_string_value_for_sql_query(value))
# create queries for inserting nodes in node table
key_header = f'_datum_com.bbn.tc.schema.avro.cdm18.{node_type}'
key_postfix= f'_com.bbn.tc.schema.avro.cdm18.UUID'
uuid = json[f'_datum_com.bbn.tc.schema.avro.cdm18.{node_type}_uuid']
node_values = [prepare_string_value_for_sql_query(uuid), prepare_string_value_for_sql_query(node_type)]
# create queries for inserting edges in edge table
source = prepare_string_value_for_sql_query(uuid)
dest = []
edge_type = []
keys = json.keys()
if node_type == 'Event':
rel = 'runsOn'
key = f"{key_header}_hostId"
add_edge(key, keys, rel, dest, edge_type,json)
rel = 'isGeneratedBy'
key = f"{key_header}_subject{key_postfix}"
add_edge(key, keys, rel, dest, edge_type,json)
rel = 'predicateObject'
key = f"{key_header}_predicateObject{key_postfix}"
add_edge(key, keys, rel, dest, edge_type,json)
rel = 'predicateObject2'
key = f"{key_header}_predicateObject2{key_postfix}"
add_edge(key, keys, rel, dest, edge_type,json)
if node_type =='Subject':
rel = 'parentSubject'
key = f"{key_header}_parentSubject{key_postfix}"
add_edge(key, keys, rel, dest, edge_type,json)
rel ='hasLocalPrincipal'
key = f"{key_header}_localPrincipal"
add_edge(key, keys, rel, dest, edge_type,json)
rel = 'runsOn'
key = f'{key_header}_hostId'
add_edge(key, keys, rel, dest, edge_type,json)
if node_type =='FileObject':
rel = 'residesOn'
key = f"{key_header}_baseObject_hostId"
add_edge(key, keys, rel, dest, edge_type,json)
rel = 'hasOwningPrincipal'
key = f"{key_header}_localPrincipal"
add_edge(key, keys, rel, dest, edge_type,json)
# if node_type == 'Host':
# Nothing to be done... no edges outgoing from host
if node_type == 'NetFlowObject' or node_type == 'SrcSinkObject':
rel = 'residesOn'
key = f"{key_header}_baseObject_hostId"
add_edge(key, keys, rel, dest, edge_type,json)
if node_type == 'Principal':
rel = 'hasAccountOn'
key = f"{key_header}_hostId"
add_edge(key, keys, rel, dest, edge_type,json)
if node_type =='UnnamedPipeObject':
rel = 'resides_on'
key = f"{key_header}_baseObject_hostId"
add_edge(key, keys, rel, dest, edge_type,json)
rel = 'affects1'
key = f"{key_header}_sourceUUID"
add_edge(key, keys, rel, dest, edge_type,json)
rel = 'affects2'
key = f"{key_header}_sinkUUID"
add_edge(key, keys, rel, dest, edge_type,json)
edge_values = [source,dest,edge_type]
queries = []
# node into each table
q1 = f"INSERT INTO {table_name} (line, {','.join(columns)}) VALUES (nextval('line_number_seq'),{','.join(values)})"
queries.append(q1)
# node into node list
q2 = f"INSERT INTO node_list (node_no, uuid, type) VALUES ( nextval('node_number_seq'),{','.join(node_values)})"
queries.append(q2)
# edge into edge list
print("edge_values: ",edge_values)
if len(edge_values[1]) != 0:
q3 = create_edge_insert_query(edge_values)
queries.append(q3)
else:
print("no edges")
return queries
def handle_message(m):
print('\nnew message: ',m)
json_type = list(m['datum'].keys())[0]
# short type string
node_type = json_type.rsplit(".", 1)[1]
# print(node_type)
flat_json = {}
flatten_obj("",m,flat_json)
queries = parse_json_to_sql_query(flat_json,node_type)
for q in queries:
execute_db_query(q)
def on_message(client, userdata, message):
'''
The callback function for message listener
'''
data = json.loads(message.payload.decode("utf-8"))
# print(f"Received message")# {data} from: ",message.topic)
# try:
handle_message(data)
# except Exception as e:
# print(e)
def on_connect(client, userdata, flags, return_code):
'''
Connecting and subscribing to the Mosquitto topic
'''
if return_code == 0:
print("connected")
client.subscribe("neo4j")
else:
print("could not connect, return code:", return_code)
client.failed_connect = True
def connect_database():
print('Create DB Connection')
return psycopg2.connect(host=pg_host,port=pg_port,user=pg_user,password=pg_pw,database=pg_db)
def execute_db_query(q:str):
cursor = connection.cursor()
cursor.execute(q)
connection.commit()
def create_pg_schema():
print('Create PG CDM Schema: ')
file_path = 'C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\streaming\\clients\\sub\\postgres\\import_node_edge.txt'
try:
with open(file_path, 'r') as file:
long_string = file.read()
# print(long_string)
except FileNotFoundError:
print(f"The file {file_path} was not found.")
except Exception as e:
print(f"An error occurred: {str(e)}")
q = long_string
execute_db_query(q)
connection = connect_database()
create_pg_schema()
# client.username_pw_set(username="user_name", password="password") # uncomment if you use password auth
client.on_connect = on_connect
client.on_message = on_message
client.failed_connect = False
client.connect(broker_hostname,broker_port)
client.loop_start()
# this try-finally block ensures that whenever we terminate the program earlier by hitting ctrl+c, it still gracefully exits
try:
i = 0
while i < abort_time_limit and client.failed_connect == False:
time.sleep(1)
i += 1
if client.failed_connect == True:
print('Connection failed, exiting...')
finally:
client.disconnect()
client.loop_stop()
connection.close()
```
%% Cell type:markdown id: tags:
# Postgres Subscriber first step
%% Cell type:code id: tags:
``` python
import paho.mqtt.client as mqtt
import time
import json
import os
import psycopg2
pg_host = str(os.getenv('pg_host',default='localhost'))
pg_port = int(os.getenv('pg_port',default='5432'))
pg_user = str(os.getenv('postgres',default='postgres'))
pg_pw = str(os.getenv('postgres',default='postgres'))
pg_db = str(os.getenv('postgres',default='postgres'))
broker_hostname=str(os.getenv('mos_host',default="localhost"))
broker_port = int(os.getenv('mos_port',default=1883))
client = mqtt.Client("Client3")
abort_time_limit = int(os.getenv('abort_time_limit', default=99999))
def handle_message(m):
print(m)
def on_message(client, userdata, message):
'''
The callback function for message listener
'''
data = json.loads(message.payload.decode("utf-8"))
print(f"Received message")# {data} from: ",message.topic)
handle_message(data)
def on_connect(client, userdata, flags, return_code):
'''
Connecting and subscribing to the Mosquitto topic
'''
if return_code == 0:
print("connected")
client.subscribe("neo4j")
else:
print("could not connect, return code:", return_code)
client.failed_connect = True
def connect_database():
print('Create DB Connection')
return psycopg2.connect(host=pg_host,port=pg_port,user=pg_user,password=pg_pw,database=pg_db)
def execute_db_query(q:str):
cursor = connection.cursor()
cursor.execute(q)
connection.commit()
def create_pg_schema():
print('Create PG CDM Schema: ')
file_path = 'C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\streaming\\clients\\sub\\postgres\\import_sql.txt'
try:
with open(file_path, 'r') as file:
long_string = file.read()
print(long_string)
except FileNotFoundError:
print(f"The file {file_path} was not found.")
except Exception as e:
print(f"An error occurred: {str(e)}")
q = long_string
execute_db_query(q)
connection = connect_database()
create_pg_schema()
# client.username_pw_set(username="user_name", password="password") # uncomment if you use password auth
client.on_connect = on_connect
client.on_message = on_message
client.failed_connect = False
client.connect(broker_hostname,broker_port)
client.loop_start()
# this try-finally block ensures that whenever we terminate the program earlier by hitting ctrl+c, it still gracefully exits
try:
i = 0
while i < abort_time_limit and client.failed_connect == False:
time.sleep(1)
i += 1
if client.failed_connect == True:
print('Connection failed, exiting...')
finally:
client.disconnect()
client.loop_stop()
connection.close()
```
%% Cell type:code id: tags:
``` python
import psycopg2
# Connect to the PostgreSQL database
conn = psycopg2.connect(
host='localhost',
port='5432',
user='postgres',
password='postgres',
database='postgres'
)
# Create a cursor object to execute SQL queries
cursor = conn.cursor()
# Define your data
data = ('John Doe', 25, 'john.doe@example.com')
# Execute an INSERT query
# cursor.execute('INSERT INTO your_table_name (name, age, email) VALUES (%s, %s, %s)', data)
cursor.execute('SELECT * FROM test')
rows = cursor.fetchall()
# Display the results
for row in rows:
print(row)
# Commit the transaction and close the connection
conn.commit()
conn.close()
```
%% Cell type:code id: tags:
``` python
### Eine einzelne Datei in 1000er Schritten zu lesen dauert Minuten
import json
import os
def read_moving_window_lines(file_path, lines_per_window, line_count):
with open(file_path, 'r') as file:
i = 0
while True:
try:
i += 1
if i>=line_count:
break
# Read the next lines_per_window lines
window_data = [next(file) for _ in range(lines_per_window)]
# If no more data is left, break the loop
if not window_data[0]:
break
# Process the current window_data
process_window_lines(window_data)
except StopIteration:
break
# Process a window
def process_window_lines(window_data):
print("Processing window:")
values =[]
for line in window_data:
json_obj = json.loads(line.strip())
json_string = json.dumps(json_obj)
values.append(json_string)
return values
file_path = 'C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\streaming\\clients\\pub\\data\\ta1-cadets-e3-official_1.json'
lines_per_window = 1000
path = 'C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\streaming\\clients\\pub\\data\\'
files = ['ta1-cadets-e3-official_1.json', 'ta1-cadets-e3-official_2.json', 'ta1-cadets-e3-official_3.json']
line_count = [4999999,4999999,3911712] # line_count corresponding to each file
i = 0
for file in files:
stop_index = line_count[i]
file_path = path+file
print("Reading file: ", file)
read_moving_window_lines(file_path, lines_per_window,stop_index)
i += 1
```
%% Cell type:code id: tags:
``` python
# '''
# Create CDM data ready for publishing
# '''
# # data = {
# # "datum":{
# # "com.bbn.tc.schema.avro.cdm18.Event":{ # Label
# # "uuid":"08DB617B-6100-51F3-9742-902710EDCA7D", # Property
# # # "sequence":{"long":4600591}, # Property
# # "sequence": 4600591,
# # "type":"EVENT_FCNTL", # Property
# # "threadId":{"int":100117}, # Property
# # "hostId":"83C8ED1F-5045-DBCD-B39F-918F0DF4F851", # Relationship: runsOn
# # "subject":{"com.bbn.tc.schema.avro.cdm18.UUID":"72FB0406-3678-11E8-BF66-D9AA8AFF4A69"}, # Relationship: isGeneratedBy
# # "predicateObject":"null", # Relationship: affects1
# # "predicateObjectPath":"null", # Property
# # "predicateObject2":"null", # Relationship: affects2
# # "predicateObject2Path":"null", # Property
# # "timestampNanos":1522828473820631110, # Property
# # "name":{"string":"aue_fcntl"}, # Property
# # "parameters":{
# # "array":[{
# # "size":-1,
# # "type":"VALUE_TYPE_CONTROL",
# # "valueDataType":"VALUE_DATA_TYPE_INT",
# # "isNull":False,
# # "name":{"string":"cmd"},
# # "runtimeDataType":"null",
# # "valueBytes":{"bytes":"04"},
# # "provenance":"null",
# # "tag":"null",
# # "components":"null"
# # }]
# # }, # Property
# # "location":"null", # Property
# # "size":"null", # Property
# # "programPoint":"null", # Property
# # "properties":{
# # "map":{
# # "host":"83c8ed1f-5045-dbcd-b39f-918f0df4f851",
# # "return_value":"0",
# # "fd":"4",
# # "exec":"python2.7",
# # "ppid":"1"
# # }
# # } # Property
# # }
# # },
# # "CDMVersion":"18", # Property
# # "source":"SOURCE_FREEBSD_DTRACE_CADETS" # Property
# # }
# data = [{
# "datum":{
# "com.bbn.tc.schema.avro.cdm18.Host":{
# "uuid":"83C8ED1F-5045-DBCD-B39F-918F0DF4F851",
# "hostName":"ta1-cadets",
# "hostIdentifiers":[],
# "osDetails":"FreeBSD 12.0-CURRENT FreeBSD 12.0-CURRENT #1 1863588dca9(HEAD)-dirty: Wed Feb 28 17:23:37 UTC 2018 root@ta1-cadets:/usr/obj/data/update/build-meta/freebsd/amd64.amd64/sys/CADETS amd64",
# "hostType":"HOST_DESKTOP",
# "interfaces":[{
# "name":"vtnet0",
# "macAddress":"52:54:00:f0:0d:23",
# "ipAddresses":["fe80::5054:ff:fef0:d23%vtnet0","10.0.6.23"]
# },
# {"name":"vtnet1",
# "macAddress":"52:54:00:f0:08:23",
# "ipAddresses":["fe80::5054:ff:fef0:823%vtnet1","128.55.12.73"]
# }]
# }
# },
# "CDMVersion":"18",
# "source":"SOURCE_FREEBSD_DTRACE_CADETS"
# },
# {
# "datum":{
# "com.bbn.tc.schema.avro.cdm18.FileObject":{
# "uuid":"42DD2C9E-36C2-11E8-BF66-D9AA8AFF4A69",
# "baseObject":{
# "hostId":"83C8ED1F-5045-DBCD-B39F-918F0DF4F851",
# "permission":"null",
# "epoch":"null",
# "properties":{"map":{}}
# },
# "type":"FILE_OBJECT_FILE",
# "fileDescriptor":"null",
# "localPrincipal":"null",
# "size":"null",
# "peInfo":"null",
# "hashes":"null"
# }
# },
# "CDMVersion":"18",
# "source":"SOURCE_FREEBSD_DTRACE_CADETS"
# }
# ]
```
%% Cell type:code id: tags:
``` python
import json
# '
# data = '{"datum":{"com.bbn.tc.schema.avro.cdm18.Event":{"uuid":"5CC868CD-FF30-5E2B-BB74-6C5B474A62B2","sequence":{"long":1},"type":"EVENT_CLOSE","threadId":{"int":100117},"hostId":"83C8ED1F-5045-DBCD-B39F-918F0DF4F851","subject":{"com.bbn.tc.schema.avro.cdm18.UUID":"72FB0406-3678-11E8-BF66-D9AA8AFF4A69"},"predicateObject":{"com.bbn.tc.schema.avro.cdm18.UUID":"42DD2DBA-36C2-11E8-BF66-D9AA8AFF4A69"},"predicateObjectPath":null,"predicateObject2":null,"predicateObject2Path":null,"timestampNanos":1522706861813350340,"name":{"string":"aue_close"},"parameters":{"array":[]},"location":null,"size":null,"programPoint":null,"properties":{"map":{"host":"83c8ed1f-5045-dbcd-b39f-918f0df4f851","return_value":"0","fd":"28","exec":"python2.7","ppid":"1"}}}},"CDMVersion":"18","source":"SOURCE_FREEBSD_DTRACE_CADETS"}'
# input = json.loads(data)
# jsonType = list(input['datum'].keys())[0]
# # print(jsonType)
# # short type string
# nodeType = jsonType.rsplit(".", 1)[1]
# # print(nodeType)
# # data of object
# value = input["datum"][jsonType]
# relations=dict(
# runsOn=""
# ,isGeneratedBy=""
# ,affects=list()
# ,residesOn=""
# ,isPartOf=""
# ,hasOwningPricipal=""
# ,hasTag=""
# ,hasParent=""
# ,hasOwningPrincipal=""
# ,hasAccountOn=""
# )
# # create relationships for host id
# if nodeType == 'Event':
# relations.update({'runsOn':value['hostId']})
# relations.update({'isGeneratedBy':value['subject']['com.bbn.tc.schema.avro.cdm18.UUID']})
# if value['predicateObject'] != "Null":
# relations['affects'].append(value['predicateObject'])
# if value['predicateObject2'] != "Null":
# relations['affects'].append(value['predicateObject2'])
# value.pop('hostId')
# value.pop('subject')
# value.pop('predicateObject')
# value.pop('predicateObjectPath')
# value.pop('predicateObject2')
# value.pop('predicateObject2Path')
# print("json for insertion: ",value)
# # print("relations: ",relations)
# # print(nodeType)
def flatten_dict(d, parent_key='', sep='_'):
items = []
for k, v in d.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(flatten_dict(v, new_key, sep=sep).items())
elif isinstance(v, list):
for i, val in enumerate(v):
items.extend(flatten_dict(val, f"{new_key}{sep}{i}", sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
# Your JSON-like object
attributes = {
'uuid': '08DB617B-6100-51F3-9742-902710EDCA7D',
'sequence': 4600591,
'type': 'EVENT_FCNTL',
'threadId': {'int': 100117},
'timestampNanos': 1522828473820631110,
'name': {'string': 'aue_fcntl'},
'parameters': {'array': [{'size': -1, 'type': 'VALUE_TYPE_CONTROL', 'valueDataType': 'VALUE_DATA_TYPE_INT', 'isNull': False, 'name': {'string': 'cmd'}, 'runtimeDataType': None, 'valueBytes': {'bytes': '04'}, 'provenance': None, 'tag': None, 'components': None}]},
'location': None,
'size': None,
'programPoint': None,
'properties': {'map': {'host': '83c8ed1f-5045-dbcd-b39f-918f0df4f851', 'return_value': '0', 'fd': '4', 'exec': 'python2.7', 'ppid': '1'}}
}
# Flatten the JSON-like object
flattened_attributes = flatten_dict(attributes)
# Print the flattened dictionary
print('flat: ',flattened_attributes)
# trim values
# print(value)
# print(isGeneratedBy)
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment