Skip to content
Snippets Groups Projects
Commit 11278d62 authored by Sven-Ove Hänsel's avatar Sven-Ove Hänsel
Browse files

fixed subscriber mapping to cdm and added names for mqtt clients

parent b614da3a
No related branches found
No related tags found
No related merge requests found
......@@ -6,7 +6,7 @@ import os
broker_hostname=str(os.getenv('mos_host',default="localhost"))
broker_port = int(os.getenv('mos_port',default=1883))
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"Client4")
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"memgraph")
db_uri =str(os.getenv('mem_host',default="bolt://localhost:7687"))
neo4j_auth=("","")
abort_time_limit = int(os.getenv('abort_time_limit', default=99999))
......@@ -73,8 +73,8 @@ def parse_json_to_cypher(input) :
# create relationships for host id
# mapping of cdm fields to relationships of nodes
if nodeType == 'Event':
if value['hostId'] != None:
relations.update({'runsOn':value['hostId']})
# if value['hostId'] != None:
# relations.update({'runsOn':value['hostId']})
if value['subject'] != None:
relations.update({'isGeneratedBy':value['subject']['com.bbn.tc.schema.avro.cdm18.UUID']})
if value['predicateObject'] != None:
......@@ -153,8 +153,27 @@ def parse_json_to_cypher(input) :
insert_strings.append(s)
# ... other relations for Object not in data
if rel =="hasParent":
q_rel = f'''
s = f'''
WITH new
MATCH (existing) where existing._uuid = "{val}"
CREATE (new) -[:{rel}]-> (existing)
'''
insert_strings.append(s)
if rel =='hasAccountOn':
s = f'''
WITH new
MATCH (existing) where existing._uuid = "{val}"
CREATE (new) -[:{rel}]-> (existing)
'''
insert_strings.append(s)
if rel == 'hasLocalPrincipal':
s = f'''
WITH new
MATCH (existing) where existing._uuid = "{val}"
CREATE (new) -[:{rel}]-> (existing)
'''
insert_strings.append(s)
q_rel = "".join(insert_strings)
value_flat = {}
flatten_obj("",value,value_flat)
......@@ -193,7 +212,7 @@ def on_connect(client, userdata, flags, return_code):
'''
if return_code == 0:
print("connected")
client.subscribe("neo4j")
client.subscribe("neo4j",qos=1)
else:
print("could not connect, return code:", return_code)
client.failed_connect = True
......
......@@ -6,7 +6,7 @@ import os
broker_hostname=str(os.getenv('mos_host',default="localhost"))
port = int(os.getenv('mos_port',default=1883))
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"Client2")
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"neo4j")
neo4j_uri =str(os.getenv('neo4j_host',default="bolt://localhost:7687"))
neo4j_auth=None
abort_time_limit = int(os.getenv('abort_time_limit', default=99999))
......@@ -73,8 +73,9 @@ def parse_json_to_cypher(input) :
# create relationships for host id
# mapping of cdm fields to relationships of nodes
if nodeType == 'Event':
if value['hostId'] != None:
relations.update({'runsOn':value['hostId']})
# event has no resides on
# if value['hostId'] != None:
# relations.update({'runsOn':value['hostId']})
if value['subject'] != None:
relations.update({'isGeneratedBy':value['subject']['com.bbn.tc.schema.avro.cdm18.UUID']})
if value['predicateObject'] != None:
......@@ -153,8 +154,28 @@ def parse_json_to_cypher(input) :
insert_strings.append(s)
# ... other relations for Object not in data
if rel =="hasParent":
q_rel = f'''
s = f'''
WITH new
MATCH (existing) where existing._uuid = "{val}"
CREATE (new) -[:{rel}]-> (existing)
'''
insert_strings.append(s)
if rel =='hasAccountOn':
s = f'''
WITH new
MATCH (existing) where existing._uuid = "{val}"
CREATE (new) -[:{rel}]-> (existing)
'''
insert_strings.append(s)
if rel == 'hasLocalPrincipal':
s = f'''
WITH new
MATCH (existing) where existing._uuid = "{val}"
CREATE (new) -[:{rel}]-> (existing)
'''
insert_strings.append(s)
q_rel = "".join(insert_strings)
value_flat = {}
flatten_obj("",value,value_flat)
......@@ -248,7 +269,7 @@ client.loop_start()
# this try-finally block ensures that whenever we terminate the program earlier by hitting ctrl+c, it still gracefully exits
try:
i = 0
while i < abort_time_limit and client.failed_connect == False:
while i < abort_time_limit: # and client.failed_connect == False:
time.sleep(1)
i += 1
if client.failed_connect == True:
......
......@@ -6,7 +6,7 @@ import os
broker_hostname=str(os.getenv('mos_host',default="localhost"))
broker_port = int(os.getenv('mos_port',default=1883))
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"Client5")
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"ongdb")
db_uri =str(os.getenv('db_host',default="bolt://localhost:7687"))
neo4j_auth=("","")
abort_time_limit = int(os.getenv('abort_time_limit', default=99999))
......@@ -73,8 +73,8 @@ def parse_json_to_cypher(input) :
# create relationships for host id
# mapping of cdm fields to relationships of nodes
if nodeType == 'Event':
if value['hostId'] != None:
relations.update({'runsOn':value['hostId']})
# if value['hostId'] != None:
# relations.update({'runsOn':value['hostId']})
if value['subject'] != None:
relations.update({'isGeneratedBy':value['subject']['com.bbn.tc.schema.avro.cdm18.UUID']})
if value['predicateObject'] != None:
......@@ -153,8 +153,27 @@ def parse_json_to_cypher(input) :
insert_strings.append(s)
# ... other relations for Object not in data
if rel =="hasParent":
q_rel = f'''
s = f'''
WITH new
MATCH (existing) where existing._uuid = "{val}"
CREATE (new) -[:{rel}]-> (existing)
'''
insert_strings.append(s)
if rel =='hasAccountOn':
s = f'''
WITH new
MATCH (existing) where existing._uuid = "{val}"
CREATE (new) -[:{rel}]-> (existing)
'''
insert_strings.append(s)
if rel == 'hasLocalPrincipal':
s = f'''
WITH new
MATCH (existing) where existing._uuid = "{val}"
CREATE (new) -[:{rel}]-> (existing)
'''
insert_strings.append(s)
q_rel = "".join(insert_strings)
value_flat = {}
flatten_obj("",value,value_flat)
......@@ -193,7 +212,7 @@ def on_connect(client, userdata, flags, return_code):
'''
if return_code == 0:
print("connected")
client.subscribe("neo4j")
client.subscribe("neo4j",qos=1)
else:
print("could not connect, return code:", return_code)
client.failed_connect = True
......@@ -248,7 +267,7 @@ client.loop_start()
# this try-finally block ensures that whenever we terminate the program earlier by hitting ctrl+c, it still gracefully exits
try:
i = 0
while i < abort_time_limit and client.failed_connect == False:
while i < abort_time_limit: #and client.failed_connect == False:
time.sleep(1)
i += 1
if client.failed_connect == True:
......
......@@ -14,7 +14,7 @@ broker_port = int(os.getenv('mos_port',default=1883))
file_path = str(os.getenv('path_sql_script',default='C:\\Studium_MIN\\05_Masterarbeit\\thesis\\ma_code\\code\\infrastructure\\streaming\\clients\\sub\\postgres\\import_node_edge.txt'))
schema_script_file = str(os.getenv('schema_script',default='import_node_edge.txt'))
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"Client3")
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"postgres",clean_session=False)
abort_time_limit = int(os.getenv('abort_time_limit', default=99999))
def flatten_obj(key, val, target):
......@@ -103,9 +103,9 @@ def parse_json_to_sql_query(json,node_type):
keys = json.keys()
if node_type == 'Event':
rel = 'runsOn'
key = f"{key_header}_hostId"
add_edge(key, keys, rel, dest, edge_type,json)
# rel = 'runsOn'
# key = f"{key_header}_hostId"
# add_edge(key, keys, rel, dest, edge_type,json)
rel = 'isGeneratedBy'
key = f"{key_header}_subject{key_postfix}"
......@@ -120,7 +120,7 @@ def parse_json_to_sql_query(json,node_type):
add_edge(key, keys, rel, dest, edge_type,json)
if node_type =='Subject':
rel = 'parentSubject'
rel = 'hasParent'
key = f"{key_header}_parentSubject{key_postfix}"
add_edge(key, keys, rel, dest, edge_type,json)
......@@ -155,11 +155,11 @@ def parse_json_to_sql_query(json,node_type):
add_edge(key, keys, rel, dest, edge_type,json)
if node_type =='UnnamedPipeObject':
rel = 'resides_on'
rel = 'residesOn'
key = f"{key_header}_baseObject_hostId"
add_edge(key, keys, rel, dest, edge_type,json)
rel = 'affects1'
rel = 'affects'
key = f"{key_header}_sourceUUID"
add_edge(key, keys, rel, dest, edge_type,json)
......@@ -222,7 +222,7 @@ def on_connect(client, userdata, flags, return_code):
'''
if return_code == 0:
print("connected")
client.subscribe("neo4j")
client.subscribe("neo4j",qos=1)
else:
print("could not connect, return code:", return_code)
client.failed_connect = True
......@@ -261,18 +261,19 @@ client.on_connect = on_connect
client.on_message = on_message
client.failed_connect = False
client.connect(broker_hostname,broker_port)
client.connect(broker_hostname,broker_port,60)
client.loop_start()
# this try-finally block ensures that whenever we terminate the program earlier by hitting ctrl+c, it still gracefully exits
try:
i = 0
while i < abort_time_limit and client.failed_connect == False:
while i < abort_time_limit: #and client.failed_connect == False:
time.sleep(1)
i += 1
if client.failed_connect == True:
print('Connection failed, exiting...')
except Exception as e:
print(e)
finally:
client.disconnect()
client.loop_stop()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment