diff --git a/code/infrastructure/streaming/clients/sub/memgraph/Dockerfile b/code/infrastructure/streaming/clients/sub/memgraph/Dockerfile index b5e785c0df7c463cb72305ef41c42ea15d0e06af..a1b4a9428b679eb831adb72b05591bd794e23f6c 100644 --- a/code/infrastructure/streaming/clients/sub/memgraph/Dockerfile +++ b/code/infrastructure/streaming/clients/sub/memgraph/Dockerfile @@ -15,7 +15,8 @@ RUN pip install neo4j # && rm dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz # Copy the Python script into the container -COPY sub_mem.py /app/ +#COPY sub_mem.py /app/ +COPY sub_mem_whole_batch_subgraph_oriented.py /app/ # Set environment variable 'time_limit' ENV abort_time_limit=999999 @@ -25,4 +26,4 @@ ENV mos_port=1883 ENV create_indices=False # Run the Python script -CMD ["python", "sub_mem.py"] +CMD ["python", "sub_mem_whole_batch_subgraph_oriented.py"] diff --git a/code/infrastructure/streaming/clients/sub/memgraph/Merge/sub_mem_whole_batch_kind_oriented_using_Merge_no_label.py b/code/infrastructure/streaming/clients/sub/memgraph/Merge/sub_mem_whole_batch_kind_oriented_using_Merge_no_label.py index a5b7d365e1723f652aa27338899ecd44192141d4..49df2daed72635dfa6adee41ce34391dccb866bf 100644 --- a/code/infrastructure/streaming/clients/sub/memgraph/Merge/sub_mem_whole_batch_kind_oriented_using_Merge_no_label.py +++ b/code/infrastructure/streaming/clients/sub/memgraph/Merge/sub_mem_whole_batch_kind_oriented_using_Merge_no_label.py @@ -7,8 +7,8 @@ from neo4j import GraphDatabase import os # Muster -# CREATE ALL nodes (Using Param) -# MATCH all lookups (not known yet) +# MERGE all lookups (not known yet) +# MERGE all nodes (Using Param) # CREATE all EDGES broker_hostname=str(os.getenv('mos_host',default="localhost")) @@ -22,6 +22,8 @@ on_disk = False analytic = True retries=int(os.getenv('lines_per_window',default=1000)) +# flatten_obj receives a complex object val and reduces it into a one-level deep dictionary. +# The reduced dictionary is written into the target def flatten_obj(key, val, target): complexType = type(val) if val is None: @@ -57,13 +59,15 @@ def flatten_obj(key, val, target): -# Data for the whole Batch, so it doesnt get copied when used in another function -# Holds a List with all known identifier, a directory to look up if the Node is already known -# Holds the Batch query -# Holds a number to create diffrent idetifier +# batchDataHolder: holds Data for the whole Batch and manages following variables +# knownNodes: holds a dictionary with all known Nodes, uses uuid as key and identifier as value +# create_Nodes: holds a list of Cypher string parts to create the new nodes +# identifierNumber: holds a number to create unique identifier +# lookup_nodes: holds a list of Cypher string parts to lookup nodes +# insert_relations: holds a list of Cypher string parts to create edges -# Holds a list with the Nodes to match and a list with relations to add -# manages the build of these lists +# The get method of each list variable creates a whole String from the parts, these Strings must append a command +# in Cypher MATCH, MERGE, CREATE and so on class batchDataHolder: def __init__(self): self.knownNodes = dict() @@ -121,7 +125,8 @@ class batchDataHolder: def get_create_Nodes(self): return ' '.join(self.create_Nodes) - + # Check if target Node is already known, if not create a lookup. + # use the new or old identifier and add edge String. def nodes_relations_to_cypher( self, val, newIdentifier, rel, type_letter, type): #type either ':Host' 'Subject' .... or '' try: if self.check_key(val): # Node is already known use the same identifier @@ -129,34 +134,39 @@ class batchDataHolder: self.insert_relations.append(f'''({newIdentifier}) -[:{rel}]-> ({ident})''') else : ident = self.get_Identifier_Increment_add_Node(type_letter, val) # get identifier and add to knownNode - s = f'({ident}:Node {{uuid: "{val}"}})' # Can't look after the Type because maybe the Node was already used but wihtout knowing the Type - # so a Merge with the type would create a new Node with the same uuid + s = f'({ident}:Node {{uuid: "{val}"}})' # Can't look after the Type, cause maybe the Node was already + #used but wihtout knowing the Type, so created without knowing the type + # so a Merge with the type would create a new Node with the same uuid and couldn't find the old node self.lookup_nodes.append(s) self.insert_relations.append(f'''({newIdentifier}) -[:{rel}]-> ({ident})''') except: print(type_letter) -nodes_count = 0 +#nodes_count = 0 +#parse_json_to_cypher: first create Variables for the whole message +# 2. Read every create node and its edges + #2.1 load string as json and transform the node properties to non nested value_flat + #2.2 create relationships from the json + #2.3 transform relationships to Cypher (make use of known Nodes) + #2.4 transform node to Cypher +# 3. put relation and node strings together to create one big Cypher query +# 4. return query and maybe the parameter def parse_json_to_cypher(data_list) : - global nodes_count - + #global nodes_count - if nodes_count % 2000 == 0: - print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) - - #nodes_count = nodes_count + #if nodes_count % 2000 == 0: + # print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) - #knownNodes = dict() #to check if Node is known and no Lookup is needed batchData = batchDataHolder() - all_values = {} + all_values = {}# Dictionary holds the key, written in the query and the value to be inserted for input_string in data_list: # run throuh subgraphs of the Batch - nodes_count = nodes_count +1 + #nodes_count = nodes_count +1 input = json.loads(input_string) jsonType = list(input['datum'].keys())[0] @@ -263,7 +273,7 @@ def parse_json_to_cypher(data_list) : - # lookup existing nodes for relations + # relationships to cypher for rel in relations.keys(): val = relations[rel] if val != '': diff --git a/code/infrastructure/streaming/clients/sub/memgraph/Notes_and_legend.txt b/code/infrastructure/streaming/clients/sub/memgraph/Notes_and_legend.txt new file mode 100644 index 0000000000000000000000000000000000000000..8b63c3e65bd80c5b04bafa98df743a2af5561b55 --- /dev/null +++ b/code/infrastructure/streaming/clients/sub/memgraph/Notes_and_legend.txt @@ -0,0 +1,22 @@ +subgraph_oriented = inkrementeller Ansatz +kind_oriented = sortierter Ansatz +Merge = MERGE, based on kind oriented the code +Unwind = UNWIND + +sub_mem_original = subscriber from the original experiment, added comment at line 15 and 32 +These comments describes what was changed for all new subscriber + +Procedure_for_module_in_memgraph = is the procedure + + +The newest and best version at the moment is 'sub_mem_whole_batch_kind_oriented_two_label_internal_collision_handle'. +Using 'OPTIONAL MATCH' throws an error at the edge, which tries to create an edge on a null node. +This null node wasn't found by 'OPTIONAL MATCH' a normal 'MATCH' would stop the query, but it woudln't throw an error. +So, this Version is good for identifying cases that need to be handled. + +For example the newest case is a MATCH on 69DDB3CA-37DD-11E8-BF66-D9AA8AFF4A69, where no node is found in the database. +It needs to check if node is first needed before created or the other way around and there was an error why this node wasn't added. +The Error was at the next message, after 5.000.040 nodes was inserted. +Used Batch size was 45 and 25 messages were send in the second. +(Was the best tradeoff till the end between execution time and amount of ram) +The node wouldn't get created in the next three messages, because the uuid wasn't in them. \ No newline at end of file diff --git a/code/infrastructure/streaming/clients/sub/memgraph/Procedure_for_module_in_memgraph.py b/code/infrastructure/streaming/clients/sub/memgraph/Procedure_for_module_in_memgraph.py index 770373532c28dd84960a88c279c64427650b8ef7..16dedb4231a1dcdfa13b7462be561d6f232c97e2 100644 --- a/code/infrastructure/streaming/clients/sub/memgraph/Procedure_for_module_in_memgraph.py +++ b/code/infrastructure/streaming/clients/sub/memgraph/Procedure_for_module_in_memgraph.py @@ -1,12 +1,11 @@ -# In Memgraph Lab open query Modules, new Module namen eingeben und -# erstellen und alles automatische durch untenstehenden ersetztn +# In Memgraph Lab open query Modules, create new Module set name +# all automatic created Code replace with code below #MATCH (n {uuid: 'g'}) #CALL <Name_des_Moduls>.bfs_non_only_host_neighbors(n, 3) YIELD result #RETURN result; -#Die parameter sind der Startknoten - +# The parameter is the start node of the search and the depth. 0 depth is direct neighbors import mgp @@ -28,7 +27,7 @@ def bfs_non_only_host_neighbors(context: mgp.ProcCtx, start_node: mgp.Vertex, ma visited.add(node) for edge in node.in_edges: #load edges, where the Neighbor points at current node neighbor = edge.from_vertex - if neighbor.properties.get('nodeType') == 'Host' or 'Host' in neighbor.labels:# Der Knoten hat das Label 'Host': + if neighbor.properties.get('nodeType') == 'Host' or 'Host' in neighbor.labels:# 'Host' if if neighbor not in visited: non_host_neighbors.add(neighbor) continue # Skip neighbors with nodeType 'Host' diff --git a/code/infrastructure/streaming/clients/sub/memgraph/Unwind/sub_mem_whole_batch_using_unwind.py b/code/infrastructure/streaming/clients/sub/memgraph/Unwind/sub_mem_whole_batch_using_unwind.py index 05ef1b43b73e2c9869b0fce9cc321f21ee4dcd33..bbbc5fc7dcfe3c234bb85984dc12cec0763c042e 100644 --- a/code/infrastructure/streaming/clients/sub/memgraph/Unwind/sub_mem_whole_batch_using_unwind.py +++ b/code/infrastructure/streaming/clients/sub/memgraph/Unwind/sub_mem_whole_batch_using_unwind.py @@ -7,7 +7,7 @@ import os # Muster # Unwind -> Do For each row in nodes -# Create All Nodes +# Create Node # # Unwind -> Do For each Start_node_type and end_node_type combination # lookup Nodes of the Edge maybe with end_node_type or without @@ -21,6 +21,8 @@ neo4j_auth=("","") abort_time_limit = int(os.getenv('abort_time_limit', default=99999)) create_indices = True #os.getenv('create_indices', 'False').lower() in ['true', '1', 't', 'y', 'yes'] +# flatten_obj receives a complex object val and reduces it into a one-level deep dictonary. +# The reduced dictonary is written into the target def flatten_obj(key, val, target): complexType = type(val) if val is None: @@ -53,12 +55,16 @@ def flatten_obj(key, val, target): target[key] = val - +# batchDataHolder: holds Data for a whole batch +# nodes: dictionary with the labels as keys to a list of value_flat, the properties of Nodes, +# each element (dictionary) of the list represents one node +# edges: dictionary with labels of edge as key and as value another dictionary +# This dictionary has start_node_label as a key and the value is a list of edge Data +# (uuid of start- and end-node) (edgelabel is in key) class batchDataHolder: def __init__(self): - self.nodes = {} # {"properties": value_flat} all Values of a Node sorted under Nodetypes - self.edges = {} # keys are the relationstype value another dictonary with keys being the start node type value a list with all fitting relations - + self.nodes = {} # {Node_label: [properties_node1, properties_node2]} + self.edges = {} # {edge_label: {start_node_label: [properties_edge1, propertes_edge2]}} def add_node_data(self, node_type, node_data): if node_type not in self.nodes: @@ -82,36 +88,33 @@ class batchDataHolder: -nodes_count = 0 +#nodes_count = 0 def parse_json_to_cypher(data_list) : - global nodes_count + #global nodes_count batchdata = batchDataHolder() - #{"sourceUuid": "1", "targetUuid": "2", "type": "RELATED_TO" - if nodes_count % 100000 == 0: - print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) + + #if nodes_count % 100000 == 0: + # print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) for input_string in data_list: # run throuh subgraphs of the Batch - nodes_count = nodes_count +1 + # nodes_count = nodes_count +1 input = json.loads(input_string) jsonType = list(input['datum'].keys())[0] # short type string nodeType = jsonType.rsplit(".", 1)[1] # data of object - value = input["datum"][jsonType] + value = input["datum"][jsonType] # value are the properties of a node, but should not be nested - value_flat = {} # TODO value_flat sind die Attribute eines Knotens, jedoch müssen alle Attribute aller neuen Knoten eingefügt werden - flatten_obj("",value,value_flat) - # newIdentifier is the central node of the current Subgraph, each line of the Batch is own new Subgraph - - # makes sure in with are differen identifier used (one numbercount for al kind of nodes) + value_flat = {} # value_flat are the properties of a node, not nested + flatten_obj("",value,value_flat) # creates non nested properties batchdata.add_node_data(nodeType, value_flat) - #nodes.append({"nodeType": nodeType, "properties": value_flat}) + source_uuid = value_flat['uuid'] diff --git a/code/infrastructure/streaming/clients/sub/memgraph/Unwind/sub_mem_whole_batch_using_unwind_edges_in_Thread.py b/code/infrastructure/streaming/clients/sub/memgraph/Unwind/sub_mem_whole_batch_using_unwind_edges_in_Thread.py index 9d3f9bb7096588b189c879fa1266112871187979..d7fdde1ababdffe97a945574d0bd58a38417a89a 100644 --- a/code/infrastructure/streaming/clients/sub/memgraph/Unwind/sub_mem_whole_batch_using_unwind_edges_in_Thread.py +++ b/code/infrastructure/streaming/clients/sub/memgraph/Unwind/sub_mem_whole_batch_using_unwind_edges_in_Thread.py @@ -8,7 +8,7 @@ import concurrent.futures # Muster # Unwind -> Do For each row in nodes -# Create All Nodes +# Create Node # # Unwind -> Do For each Start_node_type and end_node_type combination # lookup Nodes of the Edge maybe with end_node_type or without @@ -22,6 +22,8 @@ neo4j_auth=("","") abort_time_limit = int(os.getenv('abort_time_limit', default=99999)) create_indices = True #os.getenv('create_indices', 'False').lower() in ['true', '1', 't', 'y', 'yes'] +# flatten_obj receives a complex object val and reduces it into a one-level deep dictonary. +# The reduced dictonary is written into the target def flatten_obj(key, val, target): complexType = type(val) if val is None: @@ -54,11 +56,16 @@ def flatten_obj(key, val, target): target[key] = val - +# batchDataHolder: holds Data for a whole batch +# nodes: dictionary with the labels as keys to a list of value_flat, the properties of Nodes, +# each element (dictionary) of the list represents one node +# edges: dictionary with labels of edge as key and as value another dictionary +# This dictionary has start_node_label as a key and the value is a list of edge Data +# (uuid of start- and end-node) (edgelabel is in key) class batchDataHolder: def __init__(self): - self.nodes = {} # {"properties": value_flat} all Values of a Node sorted under Nodetypes - self.edges = {} # keys are the relationstype value another dictonary with keys being the start node type value a list with all fitting relations + self.nodes = {} # {Node_label: [properties_node1, properties_node2]} + self.edges = {} # {edge_label: {start_node_label: [properties_edge1, propertes_edge2]}} def add_node_data(self, node_type, node_data): @@ -83,36 +90,40 @@ class batchDataHolder: -nodes_count = 0 +#nodes_count = 0 + +#parse_json_to_cypher: first create Variables for the whole message +# 2. Read every create node and its edges + #2.1 load string as json and transform the node properties to non nested value_flat + #2.2 add node as to batchDataHolder.nodes + #2.2 create relationships from the json + #2.3 add edge as to batchDataHolder.edges +# 4. return batchDataHolder.nodes and batchDataHolder.edges def parse_json_to_cypher(data_list) : - global nodes_count + #global nodes_count batchdata = batchDataHolder() - #{"sourceUuid": "1", "targetUuid": "2", "type": "RELATED_TO" - if nodes_count % 10000 == 0: - print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) + #if nodes_count % 10000 == 0: + # print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) for input_string in data_list: # run throuh subgraphs of the Batch - nodes_count = nodes_count +1 + #nodes_count = nodes_count +1 input = json.loads(input_string) jsonType = list(input['datum'].keys())[0] # short type string nodeType = jsonType.rsplit(".", 1)[1] # data of object - value = input["datum"][jsonType] + value = input["datum"][jsonType] # value are the properties of a node, but should not be nested - value_flat = {} # TODO value_flat sind die Attribute eines Knotens, jedoch müssen alle Attribute aller neuen Knoten eingefügt werden - flatten_obj("",value,value_flat) - # newIdentifier is the central node of the current Subgraph, each line of the Batch is own new Subgraph - - # makes sure in with are differen identifier used (one numbercount for al kind of nodes) + value_flat = {} # value_flat are the properties of a node, not nested + flatten_obj("",value,value_flat) # creates non nested properties batchdata.add_node_data(nodeType, value_flat) - #nodes.append({"nodeType": nodeType, "properties": value_flat}) + source_uuid = value_flat['uuid'] @@ -191,10 +202,6 @@ def parse_json_to_cypher(data_list) : for rel in relations.keys(): val = relations[rel] if val != '': - - - - batchdata.add_edge_data(rel, nodeType, {"start_id": source_uuid, "end_id": val}) diff --git a/code/infrastructure/streaming/clients/sub/memgraph/Unwind/sub_mem_whole_batch_using_unwind_two_label.py b/code/infrastructure/streaming/clients/sub/memgraph/Unwind/sub_mem_whole_batch_using_unwind_two_label.py index b47d7d9c4855a37db91f49f63f07c34de2b5b051..4883f332a83114814c69d74f311a04aac3efb7eb 100644 --- a/code/infrastructure/streaming/clients/sub/memgraph/Unwind/sub_mem_whole_batch_using_unwind_two_label.py +++ b/code/infrastructure/streaming/clients/sub/memgraph/Unwind/sub_mem_whole_batch_using_unwind_two_label.py @@ -7,14 +7,14 @@ import os # Muster # Unwind -> Do For each row in nodes -# Create All Nodes +# Create Node # # Unwind -> Do For each Start_node_type and end_node_type combination # lookup Nodes of the Edge maybe with end_node_type or without # Create Edge broker_hostname=str(os.getenv('mos_host',default="localhost")) -broker_port = int(os.getenv('mos_port',default=1883)) +broker_port = int(os.getenv('mos_port',default=1883)) client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"memgraph") db_uri =str(os.getenv('mem_host',default="bolt://localhost:8687")) #local test Port 8687 not local 7687 neo4j_auth=("","") @@ -22,6 +22,8 @@ abort_time_limit = int(os.getenv('abort_time_limit', default=99999)) create_indices = True #os.getenv('create_indices', 'False').lower() in ['true', '1', 't', 'y', 'yes'] second_label = 'Node' +# flatten_obj receives a complex object val and reduces it into a one-level deep dictonary. +# The reduced dictonary is written into the target def flatten_obj(key, val, target): complexType = type(val) if val is None: @@ -52,14 +54,19 @@ def flatten_obj(key, val, target): target[f"{key}_{index}"] = value else: target[key] = val - +# batchDataHolder: holds Data for a whole batch +# nodes: dictionary with the labels as keys to a list of value_flat, the properties of Nodes, +# each element (dictionary) of the list represents one node +# edges: dictionary with labels of edge as key and as value another dictionary +# This dictionary has start_node_label as a key and the value is a list of edge Data +# (uuid of start- and end-node) (edgelabel is in key) class batchDataHolder: def __init__(self): - self.nodes = {} # {"properties": value_flat} all Values of a Node sorted under Nodetypes - self.edges = {} # keys are the relationstype value another dictonary with keys being the start node type value a list with all fitting relations - + self.nodes = {} + # {Node_label: [properties_node1, properties_node2]} (properties are dictionaries created from flatten_obj) + self.edges = {} # {edge_label: {start_node_label: [properties_edge1, propertes_edge2]}} def add_node_data(self, node_type, node_data): if node_type not in self.nodes: @@ -83,36 +90,41 @@ class batchDataHolder: -nodes_count = 0 +#nodes_count = 0 +#parse_json_to_cypher: first create Variables for the whole message +# 2. Read every create node and its edges + #2.1 load string as json and transform the node properties to non nested value_flat + #2.2 add node as to batchDataHolder.nodes + #2.2 create relationships from the json + #2.3 add edge as to batchDataHolder.edges +# 4. return batchDataHolder.nodes and batchDataHolder.edges def parse_json_to_cypher(data_list) : - global nodes_count + #global nodes_count batchdata = batchDataHolder() - #{"sourceUuid": "1", "targetUuid": "2", "type": "RELATED_TO" - if nodes_count % 100000 == 0: - print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) + + #if nodes_count % 100000 == 0: + # print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) for input_string in data_list: # run throuh subgraphs of the Batch - nodes_count = nodes_count +1 + #nodes_count = nodes_count +1 input = json.loads(input_string) jsonType = list(input['datum'].keys())[0] # short type string nodeType = jsonType.rsplit(".", 1)[1] # data of object - value = input["datum"][jsonType] + value = input["datum"][jsonType] # value are the properties of a node, but should not be nested - value_flat = {} # TODO value_flat sind die Attribute eines Knotens, jedoch müssen alle Attribute aller neuen Knoten eingefügt werden - flatten_obj("",value,value_flat) - # newIdentifier is the central node of the current Subgraph, each line of the Batch is own new Subgraph + value_flat = {} # value_flat are the properties of a node, not nested + flatten_obj("",value,value_flat) # creates non nested properties - # makes sure in with are differen identifier used (one numbercount for al kind of nodes) batchdata.add_node_data(nodeType, value_flat) - #nodes.append({"nodeType": nodeType, "properties": value_flat}) + source_uuid = value_flat['uuid'] @@ -337,10 +349,10 @@ client.on_connect = on_connect client.on_message = on_message client.failed_connect = False -client.connect(broker_hostname, broker_port,keepalive=3600*4) +client.connect(broker_hostname, broker_port,keepalive=3600*4) client.loop_start() -# this try-finally block ensures that whenever we terminate the program earlier by hitting ctrl+c, it still gracefully exits +# this try-finally block ensures that whenever we terminate the program earlier by hitting ctrl+c, it still gracefully exits try: i = 0 while i < abort_time_limit: #and client.failed_connect == False: @@ -351,6 +363,6 @@ try: finally: client.disconnect() - client.loop_stop() + client.loop_stop() driver.close() diff --git a/code/infrastructure/streaming/clients/sub/memgraph/Unwind/sub_mem_whole_batch_using_unwind_using_procedure_node.py b/code/infrastructure/streaming/clients/sub/memgraph/Unwind/sub_mem_whole_batch_using_unwind_using_procedure_node.py index dd4e2dd9ec91e57691f93fe22dbc5d6d4d983668..d7607d90904a90c4d000570bf5fcad22b8f467ad 100644 --- a/code/infrastructure/streaming/clients/sub/memgraph/Unwind/sub_mem_whole_batch_using_unwind_using_procedure_node.py +++ b/code/infrastructure/streaming/clients/sub/memgraph/Unwind/sub_mem_whole_batch_using_unwind_using_procedure_node.py @@ -7,7 +7,7 @@ import os # Muster # Unwind -> Do For each row in nodes -# Create All Nodes +# Create Node using create.node # # Unwind -> Do For each Start_node_type and end_node_type combination # lookup Nodes of the Edge maybe with end_node_type or without @@ -21,6 +21,8 @@ neo4j_auth=("","") abort_time_limit = int(os.getenv('abort_time_limit', default=99999)) create_indices = True #os.getenv('create_indices', 'False').lower() in ['true', '1', 't', 'y', 'yes'] +# flatten_obj receives a complex object val and reduces it into a one-level deep dictonary. +# The reduced dictonary is written into the target def flatten_obj(key, val, target): complexType = type(val) if val is None: @@ -53,12 +55,15 @@ def flatten_obj(key, val, target): target[key] = val - +# batchDataHolder: holds Data for a whole batch +# nodes: list of dictionaries, each represents one Node, with the label (type) and properties of this node +# edges: dictionary with labels of edge as key and as value another dictionary +# This dictionary has start_node_label as a key and the value is a list of edge Data +# (uuid of start- and end-node) (edgelabel is in key) class batchDataHolder: def __init__(self): - self.nodes = [] # {"properties": value_flat} all Values of a Node sorted under Nodetypes - self.edges = {} # keys are the relationstype value another dictonary with keys being the start node type value a list with all fitting relations - + self.nodes = {} # {Node_label: [properties_node1, properties_node2]} + self.edges = {} # {edge_label: {start_node_label: [properties_edge1, propertes_edge2]}} def add_node_data(self, node_type, node_data): node = {"data": node_data, "type": node_type} @@ -82,36 +87,34 @@ class batchDataHolder: -nodes_count = 0 +#nodes_count = 0 def parse_json_to_cypher(data_list) : - global nodes_count + #global nodes_count batchdata = batchDataHolder() - #{"sourceUuid": "1", "targetUuid": "2", "type": "RELATED_TO" - if nodes_count % 100000 == 0: - print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) + + #if nodes_count % 100000 == 0: + # print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) for input_string in data_list: # run throuh subgraphs of the Batch - nodes_count = nodes_count +1 + #nodes_count = nodes_count +1 input = json.loads(input_string) jsonType = list(input['datum'].keys())[0] # short type string nodeType = jsonType.rsplit(".", 1)[1] # data of object - value = input["datum"][jsonType] + value = input["datum"][jsonType] # value are the properties of a node, but should not be nested - value_flat = {} # TODO value_flat sind die Attribute eines Knotens, jedoch müssen alle Attribute aller neuen Knoten eingefügt werden - flatten_obj("",value,value_flat) - # newIdentifier is the central node of the current Subgraph, each line of the Batch is own new Subgraph + value_flat = {} # value_flat are the properties of a node, not nested + flatten_obj("",value,value_flat) # creates non nested properties - # makes sure in with are differen identifier used (one numbercount for al kind of nodes) batchdata.add_node_data(nodeType, value_flat) - #nodes.append({"nodeType": nodeType, "properties": value_flat}) + source_uuid = value_flat['uuid'] diff --git a/code/infrastructure/streaming/clients/sub/memgraph/Unwind/sub_mem_whole_batch_using_unwind_using_procedure_nodes.py b/code/infrastructure/streaming/clients/sub/memgraph/Unwind/sub_mem_whole_batch_using_unwind_using_procedure_nodes.py index 22af87e812f55ee314b547896036ae4cee23a34d..dfbe054bae1c35047420ed9e5963a8726056846b 100644 --- a/code/infrastructure/streaming/clients/sub/memgraph/Unwind/sub_mem_whole_batch_using_unwind_using_procedure_nodes.py +++ b/code/infrastructure/streaming/clients/sub/memgraph/Unwind/sub_mem_whole_batch_using_unwind_using_procedure_nodes.py @@ -6,8 +6,7 @@ from neo4j import GraphDatabase import os # Muster -# Unwind -> Do For each row in nodes -# Create All Nodes +# Create All Nodes using create.nodes() # # Unwind -> Do For each Start_node_type and end_node_type combination # lookup Nodes of the Edge maybe with end_node_type or without @@ -21,6 +20,8 @@ neo4j_auth=("","") abort_time_limit = int(os.getenv('abort_time_limit', default=99999)) create_indices = True #os.getenv('create_indices', 'False').lower() in ['true', '1', 't', 'y', 'yes'] +# flatten_obj receives a complex object val and reduces it into a one-level deep dictonary. +# The reduced dictonary is written into the target def flatten_obj(key, val, target): complexType = type(val) if val is None: @@ -53,12 +54,16 @@ def flatten_obj(key, val, target): target[key] = val - +# batchDataHolder: holds Data for a whole batch +# nodes: dictionary with the labels as keys to a list of value_flat, the properties of Nodes, +# each element (dictionary) of the list represents one node +# edges: dictionary with labels of edge as key and as value another dictionary +# This dictionary has start_node_label as a key and the value is a list of edge Data +# (uuid of start- and end-node) (edgelabel is in key) class batchDataHolder: def __init__(self): - self.nodes = {} # {"properties": value_flat} all Values of a Node sorted under Nodetypes - self.edges = {} # keys are the relationstype value another dictonary with keys being the start node type value a list with all fitting relations - + self.nodes = {} # {Node_label: [properties_node1, properties_node2]} + self.edges = {} # {edge_label: {start_node_label: [properties_edge1, propertes_edge2]}} def add_node_data(self, node_type, node_data): if node_type not in self.nodes: @@ -82,36 +87,34 @@ class batchDataHolder: -nodes_count = 0 +#nodes_count = 0 def parse_json_to_cypher(data_list) : - global nodes_count + #global nodes_count batchdata = batchDataHolder() - #{"sourceUuid": "1", "targetUuid": "2", "type": "RELATED_TO" - if nodes_count % 100000 == 0: - print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) + + #if nodes_count % 100000 == 0: + #print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) for input_string in data_list: # run throuh subgraphs of the Batch - nodes_count = nodes_count +1 + #nodes_count = nodes_count +1 input = json.loads(input_string) jsonType = list(input['datum'].keys())[0] # short type string nodeType = jsonType.rsplit(".", 1)[1] # data of object - value = input["datum"][jsonType] + value = input["datum"][jsonType] # value are the properties of a node, but should not be nested - value_flat = {} # TODO value_flat sind die Attribute eines Knotens, jedoch müssen alle Attribute aller neuen Knoten eingefügt werden - flatten_obj("",value,value_flat) - # newIdentifier is the central node of the current Subgraph, each line of the Batch is own new Subgraph + value_flat = {} # value_flat are the properties of a node, not nested + flatten_obj("",value,value_flat) # creates non nested properties - # makes sure in with are differen identifier used (one numbercount for al kind of nodes) batchdata.add_node_data(nodeType, value_flat) - #nodes.append({"nodeType": nodeType, "properties": value_flat}) + source_uuid = value_flat['uuid'] diff --git a/code/infrastructure/streaming/clients/sub/memgraph/kind_oriented/sub_mem_whole_batch_kind_oriented.py b/code/infrastructure/streaming/clients/sub/memgraph/kind_oriented/sub_mem_whole_batch_kind_oriented.py index e639b9802ba71b320ea1c835562e764b618b35c0..d5f2ba27598ac717db12794e3d3be88c8d98bbf1 100644 --- a/code/infrastructure/streaming/clients/sub/memgraph/kind_oriented/sub_mem_whole_batch_kind_oriented.py +++ b/code/infrastructure/streaming/clients/sub/memgraph/kind_oriented/sub_mem_whole_batch_kind_oriented.py @@ -6,8 +6,8 @@ from neo4j import GraphDatabase import os # Muster -# CREATE ALL nodes (Using Param) # MATCH all lookups (not known yet) +# CREATE all nodes (Using Param) # CREATE all EDGES broker_hostname=str(os.getenv('mos_host',default="localhost")) @@ -20,6 +20,8 @@ create_indices = True #os.getenv('create_indices', 'False').lower() in ['true', on_disk = False analytic = True +# flatten_obj receives a complex object val and reduces it into a one-level deep dictionary. +# The reduced dictionary is written into the target def flatten_obj(key, val, target): complexType = type(val) if val is None: @@ -55,13 +57,15 @@ def flatten_obj(key, val, target): -# Data for the whole Batch, so it doesnt get copied when used in another function -# Holds a List with all known identifier, a directory to look up if the Node is already known -# Holds the Batch query -# Holds a number to create diffrent idetifier +# batchDataHolder: holds Data for the whole Batch and manages following variables +# knownNodes: holds a dictionary with all known Nodes, uses uuid as key and identifier as value +# create_Nodes: holds a list of Cypher string parts to create the new nodes +# identifierNumber: holds a number to create unique identifier +# lookup_nodes: holds a list of Cypher string parts to lookup nodes +# insert_relations: holds a list of Cypher string parts to create edges -# Holds a list with the Nodes to match and a list with relations to add -# manages the build of these lists +# The get method of each list variable creates a whole String from the parts, these Strings must append a command +# in Cypher MATCH, MERGE, CREATE and so on class batchDataHolder: def __init__(self): self.knownNodes = dict() @@ -119,7 +123,8 @@ class batchDataHolder: def get_create_Nodes(self): return ', '.join(self.create_Nodes) - + # Check if target Node is already known, if not create a lookup. + # use the new or old identifier and add edge String. def nodes_relations_to_cypher( self, val, newIdentifier, rel, type_letter, type): #type either ':Host' 'Subject' .... or '' try: if self.check_key(val): # Node is already known use the same identifier @@ -134,19 +139,25 @@ class batchDataHolder: print(type_letter) -nodes_count = 0 +#nodes_count = 0 +#parse_json_to_cypher: first create Variables for the whole message +# 2. Read every create node and its edges + #2.1 load string as json and transform the node properties to non nested value_flat + #2.2 create relationships from the json + #2.3 transform relationships to Cypher (make use of known Nodes) + #2.4 transform node to Cypher +# 3. put relation and node strings together to create one big Cypher query +# 4. return query and maybe the parameter def parse_json_to_cypher(data_list) : - global nodes_count + #global nodes_count + + #if nodes_count % 10000 == 0: + # print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) - if nodes_count % 10000 == 0: - print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) - - #nodes_count = nodes_count - #knownNodes = dict() #to check if Node is known and no Lookup is needed batchData = batchDataHolder() - all_values = {} + all_values = {}# Dictionary holds the key, written in the query and the value to be inserted for input_string in data_list: # run throuh subgraphs of the Batch @@ -158,13 +169,14 @@ def parse_json_to_cypher(data_list) : # short type string nodeType = jsonType.rsplit(".", 1)[1] # data of object - value = input["datum"][jsonType] + value = input["datum"][jsonType] # value are the properties of a node, but should not be nested - value_flat = {} # TODO value_flat sind die Attribute eines Knotens, jedoch müssen alle Attribute aller neuen Knoten eingefügt werden - flatten_obj("",value,value_flat) - # newIdentifier is the central node of the current Subgraph, each line of the Batch is own new Subgraph + value_flat = {} # value_flat are the properties of a node, not nested + flatten_obj("",value,value_flat) # creates non nested properties + + # newIdentifier is the alias of the new node in the query newIdentifier = batchData.get_Identifier_Increment_add_Node('_', value_flat['uuid']) - # makes sure in with are differen identifier used (one numbercount for al kind of nodes) + all_values[newIdentifier] = value_flat relations = dict( @@ -238,7 +250,7 @@ def parse_json_to_cypher(data_list) : - # lookup existing nodes for relations + # relationships to cypher for rel in relations.keys(): val = relations[rel] if val != '': diff --git a/code/infrastructure/streaming/clients/sub/memgraph/kind_oriented/sub_mem_whole_batch_kind_oriented_no_Label.py b/code/infrastructure/streaming/clients/sub/memgraph/kind_oriented/sub_mem_whole_batch_kind_oriented_no_Label.py index 53eca40fff3cedbb926632112ab87db528e64217..1a60f1e20aee58976f24a5fe151e9da34e657173 100644 --- a/code/infrastructure/streaming/clients/sub/memgraph/kind_oriented/sub_mem_whole_batch_kind_oriented_no_Label.py +++ b/code/infrastructure/streaming/clients/sub/memgraph/kind_oriented/sub_mem_whole_batch_kind_oriented_no_Label.py @@ -6,8 +6,8 @@ from neo4j import GraphDatabase import os # Muster -# CREATE ALL nodes (Using Param) # MATCH all lookups (not known yet) +# CREATE all nodes (Using Param) # CREATE all EDGES broker_hostname=str(os.getenv('mos_host',default="localhost")) @@ -20,6 +20,8 @@ create_indices = True #os.getenv('create_indices', 'False').lower() in ['true', on_disk = False create_noteType = True # second index only if create_indices true +# flatten_obj receives a complex object val and reduces it into a one-level deep dictionary. +# The reduced dictionary is written into the target def flatten_obj(key, val, target): complexType = type(val) if val is None: @@ -55,22 +57,25 @@ def flatten_obj(key, val, target): -# Data for the whole Batch, so it doesnt get copied when used in another function -# Holds a List with all known identifier, a directory to look up if the Node is already known -# Holds the Batch query -# Holds a number to create diffrent idetifier +# batchDataHolder: holds Data for the whole Batch and manages following variables +# knownNodes: holds a dictionary with all known Nodes, uses uuid as key and identifier as value +# create_Nodes: holds a list of Cypher string parts to create the new nodes +# identifierNumber: holds a number to create unique identifier +# lookup_nodes: holds a list of Cypher string parts to lookup nodes +# insert_relations: holds a list of Cypher string parts to create edges + +# The get method of each list variable creates a whole String from the parts, these Strings must append a command +# in Cypher MATCH, MERGE, CREATE and so on -# Holds a list with the Nodes to match and a list with relations to add -# manages the build of these lists class batchDataHolder: def __init__(self): - self.knownNodes = dict() + self.knownNodes = dict() #key is uuid, value is identifier self.create_Nodes = [] self.identifierNumber = 0 #number to create unique identifier in the query self.lookup_nodes = [] self.insert_relations = [] - def add_entry_Node(self, key, value): + def add_entry_Node(self, key, value): #key is uuid, value is identifier self.knownNodes[key] = value def get_lookup_nodes(self): @@ -107,8 +112,8 @@ class batchDataHolder: def append_with(self, ident): self.with_list_identifier.append(ident) - def get_Identifier_Increment_add_Node(self, type, key):#create a new identifier - identifier = type + str(self.identifierNumber) + def get_Identifier_Increment_add_Node(self, type, key):#create a new identifier # key is the uuid + identifier = type + str(self.identifierNumber) self.identifierNumber += 1 self.add_entry_Node(key, identifier) return identifier @@ -119,7 +124,8 @@ class batchDataHolder: def get_create_Nodes(self): return ', '.join(self.create_Nodes) - + # Check if target Node is already known, if not create a lookup. + # use the new or old identifier and add edge String. def nodes_relations_to_cypher( self, val, newIdentifier, rel, type_letter, type): #type either ':Host' 'Subject' .... or '' try: if self.check_key(val): # Node is already known use the same identifier @@ -127,45 +133,51 @@ class batchDataHolder: self.insert_relations.append(f'''({newIdentifier}) -[:{rel}]-> ({ident})''') else : ident = self.get_Identifier_Increment_add_Node(type_letter, val) # get identifier and add to knownNode - s = f'({ident}:Node {{uuid: "{val}"}})' + s = f'({ident}:Node {{uuid: "{val}"}})' #Cypher lookup string self.lookup_nodes.append(s) self.insert_relations.append(f'''({newIdentifier}) -[:{rel}]-> ({ident})''') except: print(type_letter) -nodes_count = 0 +#nodes_count = 0 +#parse_json_to_cypher: first create Variables for the whole message +# 2. Read every create node and its edges + #2.1 load string as json and transform the node properties to non nested value_flat + #2.2 create relationships from the json + #2.3 transform relationships to Cypher (make use of known Nodes) + #2.4 transform node to Cypher +# 3. put relation and node strings together to create one big Cypher query +# 4. return query and maybe the parameter def parse_json_to_cypher(data_list) : - global nodes_count + #global nodes_count - if nodes_count % 10000 == 0: - print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) - - #nodes_count = nodes_count + #if nodes_count % 10000 == 0: + # print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) - #knownNodes = dict() #to check if Node is known and no Lookup is needed batchData = batchDataHolder() - all_values = {} + all_values = {} # Dictionary holds the key, written in the query and the value to be inserted for input_string in data_list: # run throuh subgraphs of the Batch - nodes_count = nodes_count +1 + #nodes_count = nodes_count +1 input = json.loads(input_string) jsonType = list(input['datum'].keys())[0] # short type string nodeType = jsonType.rsplit(".", 1)[1] # data of object - value = input["datum"][jsonType] + value = input["datum"][jsonType] # value are the properties of a node, but should not be nested - value_flat = {} # TODO value_flat sind die Attribute eines Knotens, jedoch müssen alle Attribute aller neuen Knoten eingefügt werden - flatten_obj("",value,value_flat) - # newIdentifier is the central node of the current Subgraph, each line of the Batch is own new Subgraph + value_flat = {} # value_flat are the properties of a node, not nested + flatten_obj("",value,value_flat) # creates non nested properties + + # newIdentifier is the alias of the new node in the query newIdentifier = batchData.get_Identifier_Increment_add_Node('_', value_flat['uuid']) - # makes sure in with are differen identifier used (one numbercount for al kind of nodes) - value_flat['nodeType'] = nodeType #adding the NodeType as an value to add as Param + + value_flat['nodeType'] = nodeType #adding the NodeType as an value to add as a property with a parameter all_values[newIdentifier] = value_flat relations = dict( @@ -239,7 +251,7 @@ def parse_json_to_cypher(data_list) : - # lookup existing nodes for relations + # relationships to cypher for rel in relations.keys(): val = relations[rel] if val != '': @@ -280,11 +292,14 @@ def parse_json_to_cypher(data_list) : print('relations: ', relations) break - - if batchData.is_empty_lookup_nodes(): - query = f""" CREATE {batchData.get_create_Nodes()} CREATE {batchData.get_insert_relations()}""" - else: + if batchData.is_empty_lookup_nodes(): # No Lookup (lookup needed = edges are being created) + if batchData.is_empty_insert_relations(): # No Lookup, maybe no edges, most likely edges between the created nodes + query = f""" CREATE {batchData.get_create_Nodes()}""" + else: + query = f""" CREATE {batchData.get_create_Nodes()} CREATE {batchData.get_insert_relations()}""" + else: # Most likely lookup needed and edges being created query = f""" MATCH {batchData.get_lookup_nodes()} CREATE {batchData.get_create_Nodes()} CREATE {batchData.get_insert_relations()}""" + # print() # print('query: ', query) # print() diff --git a/code/infrastructure/streaming/clients/sub/memgraph/kind_oriented/sub_mem_whole_batch_kind_oriented_no_Params.py b/code/infrastructure/streaming/clients/sub/memgraph/kind_oriented/sub_mem_whole_batch_kind_oriented_no_Params.py index f54e7701d6801200b73adde64b21963ec88b94e0..45f39d7d60f718b31a0f54d80ef94028694133df 100644 --- a/code/infrastructure/streaming/clients/sub/memgraph/kind_oriented/sub_mem_whole_batch_kind_oriented_no_Params.py +++ b/code/infrastructure/streaming/clients/sub/memgraph/kind_oriented/sub_mem_whole_batch_kind_oriented_no_Params.py @@ -6,8 +6,8 @@ from neo4j import GraphDatabase import os # Muster -# CREATE ALL nodes (Using Param) # MATCH all lookups (not known yet) +# CREATE all nodes (not using Param) # CREATE all EDGES #Error at 574 750 - 575 000 , creates /' escape zeichen macht Query Anfrage ungültig verrsucht dreckig alle /' durch ' zu ersetzen prüfen. @@ -22,6 +22,8 @@ abort_time_limit = int(os.getenv('abort_time_limit', default=99999)) create_indices = True #os.getenv('create_indices', 'False').lower() in ['true', '1', 't', 'y', 'yes'] on_disk = False +# flatten_obj receives a complex object val and reduces it into a one-level deep dictionary. +# The reduced dictionary is written into the target def flatten_obj(key, val, target): complexType = type(val) if val is None: @@ -57,13 +59,15 @@ def flatten_obj(key, val, target): -# Data for the whole Batch, so it doesnt get copied when used in another function -# Holds a List with all known identifier, a directory to look up if the Node is already known -# Holds the Batch query -# Holds a number to create diffrent idetifier +# batchDataHolder: holds Data for the whole Batch and manages following variables +# knownNodes: holds a dictionary with all known Nodes, uses uuid as key and identifier as value +# create_Nodes: holds a list of Cypher string parts to create the new nodes +# identifierNumber: holds a number to create unique identifier +# lookup_nodes: holds a list of Cypher string parts to lookup nodes +# insert_relations: holds a list of Cypher string parts to create edges -# Holds a list with the Nodes to match and a list with relations to add -# manages the build of these lists +# The get method of each list variable creates a whole String from the parts, these Strings must append a command +# in Cypher MATCH, MERGE, CREATE and so on class batchDataHolder: def __init__(self): self.knownNodes = dict() @@ -121,7 +125,8 @@ class batchDataHolder: def get_create_Nodes(self): return ', '.join(self.create_Nodes) - + # Check if target Node is already known, if not create a lookup. + # use the new or old identifier and add edge String. def nodes_relations_to_cypher( self, val, newIdentifier, rel, type_letter, type): #type either ':Host' 'Subject' .... or '' try: if self.check_key(val): # Node is already known use the same identifier @@ -136,26 +141,29 @@ class batchDataHolder: print(type_letter) -nodes_count = 0 +#nodes_count = 0 +#parse_json_to_cypher: first create Variables for the whole message +# 2. Read every create node and its edges + #2.1 load string as json and transform the node properties to non nested value_flat + #2.2 create relationships from the json + #2.3 transform relationships to Cypher (make use of known Nodes) + #2.4 transform node to Cypher +# 3. put relation and node strings together to create one big Cypher query +# 4. return query and maybe the parameter def parse_json_to_cypher(data_list) : - global nodes_count - - if nodes_count % 10000 == 0: - print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) - - #nodes_count = nodes_count + #global nodes_count + #if nodes_count % 10000 == 0: + # print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) - - #knownNodes = dict() #to check if Node is known and no Lookup is needed batchData = batchDataHolder() - all_values = {} + for input_string in data_list: # run throuh subgraphs of the Batch - nodes_count = nodes_count +1 + #nodes_count = nodes_count +1 input = json.loads(input_string) jsonType = list(input['datum'].keys())[0] @@ -163,14 +171,13 @@ def parse_json_to_cypher(data_list) : # short type string nodeType = jsonType.rsplit(".", 1)[1] # data of object - value = input["datum"][jsonType] + value = input["datum"][jsonType] # value are the properties of a node, but should not be nested + + value_flat = {} # value_flat are the properties of a node, not nested + flatten_obj("",value,value_flat) # creates non nested properties - value_flat = {} # TODO value_flat sind die Attribute eines Knotens, jedoch müssen alle Attribute aller neuen Knoten eingefügt werden - flatten_obj("",value,value_flat) - # newIdentifier is the central node of the current Subgraph, each line of the Batch is own new Subgraph + # newIdentifier is the alias of the new node in the query newIdentifier = batchData.get_Identifier_Increment_add_Node('_', value_flat['uuid']) - # makes sure in with are differen identifier used (one numbercount for al kind of nodes) - all_values[newIdentifier] = value_flat relations = dict( runsOn="" @@ -243,7 +250,7 @@ def parse_json_to_cypher(data_list) : - # lookup existing nodes for relations + # relationships to cypher for rel in relations.keys(): val = relations[rel] if val != '': @@ -277,15 +284,14 @@ def parse_json_to_cypher(data_list) : elif rel == 'hasLocalPrincipal': batchData.nodes_relations_to_cypher(val, newIdentifier, rel, 'not', '') - attribute_string = [] - #', '.join([f"{k}: '{v}'" for k, v in value_flat.items()]) + attribute_string = [] # holds list of strings with the properties of a node - for k, v in value_flat.items(): # Makes Sure only the Strings are handelded as Strings + for k, v in value_flat.items(): # transforms value_flat dictionary to string, attention to non string elements if isinstance(v, str): attribute_string.append(f"{k}: '{v}'") else: attribute_string.append(f"{k}: {v}") - + #adding the properties as String batchData.append_create_Nodes(f"""({newIdentifier}:{nodeType} {{{','.join(attribute_string)}}})""") except: print('Exception') @@ -293,25 +299,29 @@ def parse_json_to_cypher(data_list) : print('relations: ', relations) break - - if batchData.is_empty_lookup_nodes(): - query = f""" CREATE {batchData.get_create_Nodes()} CREATE {batchData.get_insert_relations()}""" - else: + # checks if a Lookup is needed or relations are being created and creates Query respectively out of the Query parts + + if batchData.is_empty_lookup_nodes(): # No Lookup (lookup needed = edges are being created) + if batchData.is_empty_insert_relations(): # No Lookup, maybe no edges, most likely edges between the created nodes + query = f""" CREATE {batchData.get_create_Nodes()}""" + else: + query = f""" CREATE {batchData.get_create_Nodes()} CREATE {batchData.get_insert_relations()}""" + else: # Most likely lookup needed and edges being created query = f""" MATCH {batchData.get_lookup_nodes()} CREATE {batchData.get_create_Nodes()} CREATE {batchData.get_insert_relations()}""" # print() # print('query: ', query) # print() - # print('attributes: ', all_values) + # print(nodes_count) - return query, all_values + return query def create_cypher_query_from_cdm(json): ''' Create Cypher Queries from publisher message ''' - query, value = parse_json_to_cypher(json) - return query, value + query = parse_json_to_cypher(json) + return query def on_message(client, userdata, message): ''' @@ -319,8 +329,8 @@ def on_message(client, userdata, message): ''' data = json.loads(message.payload.decode("utf-8")) print("Received message from: ",message.topic) - q, value = create_cypher_query_from_cdm(data) - execute_query(q, value) + q = create_cypher_query_from_cdm(data) + execute_query(q) def on_connect(client, userdata, flags, return_code): ''' @@ -360,7 +370,7 @@ def connect_to_db(uri,auth): return driver -def execute_query(query:str, value): +def execute_query(query:str): ''' Execute any Neo4j Query. diff --git a/code/infrastructure/streaming/clients/sub/memgraph/kind_oriented/sub_mem_whole_batch_kind_oriented_two_label.py b/code/infrastructure/streaming/clients/sub/memgraph/kind_oriented/sub_mem_whole_batch_kind_oriented_two_label.py index 637e415fcb7f7da95d011fc4048076e600af22fd..95746a560f31e37930a04bac1c9a179250a3471d 100644 --- a/code/infrastructure/streaming/clients/sub/memgraph/kind_oriented/sub_mem_whole_batch_kind_oriented_two_label.py +++ b/code/infrastructure/streaming/clients/sub/memgraph/kind_oriented/sub_mem_whole_batch_kind_oriented_two_label.py @@ -6,8 +6,8 @@ from neo4j import GraphDatabase import os # Muster -# CREATE ALL nodes (Using Param) # MATCH all lookups (not known yet) +# CREATE all nodes (Using Param) # CREATE all EDGES broker_hostname=str(os.getenv('mos_host',default="localhost")) @@ -21,6 +21,8 @@ on_disk = False second_label = 'Node' analytic = False +# flatten_obj receives a complex object val and reduces it into a one-level deep dictionary. +# The reduced dictionary is written into the target def flatten_obj(key, val, target): complexType = type(val) if val is None: @@ -56,13 +58,15 @@ def flatten_obj(key, val, target): -# Data for the whole Batch, so it doesnt get copied when used in another function -# Holds a List with all known identifier, a directory to look up if the Node is already known -# Holds the Batch query -# Holds a number to create diffrent idetifier +# batchDataHolder: holds Data for the whole Batch and manages following variables +# knownNodes: holds a dictionary with all known Nodes, uses uuid as key and identifier as value +# create_Nodes: holds a list of Cypher string parts to create the new nodes +# identifierNumber: holds a number to create unique identifier +# lookup_nodes: holds a list of Cypher string parts to lookup nodes +# insert_relations: holds a list of Cypher string parts to create edges -# Holds a list with the Nodes to match and a list with relations to add -# manages the build of these lists +# The get method of each list variable creates a whole String from the parts, these Strings must append a command +# in Cypher MATCH, MERGE, CREATE and so on class batchDataHolder: def __init__(self): self.knownNodes = dict() @@ -120,7 +124,8 @@ class batchDataHolder: def get_create_Nodes(self): return ', '.join(self.create_Nodes) - + # Check if target Node is already known, if not create a lookup. + # use the new or old identifier and add edge String. def nodes_relations_to_cypher( self, val, newIdentifier, rel, type_letter, type): #type either ':Host' 'Subject' .... or '' try: if self.check_key(val): # Node is already known use the same identifier @@ -135,37 +140,44 @@ class batchDataHolder: print(type_letter) -nodes_count = 0 +#nodes_count = 0 +#parse_json_to_cypher: first create Variables for the whole message +# 2. Read every create node and its edges + #2.1 load string as json and transform the node properties to non nested value_flat + #2.2 create relationships from the json + #2.3 transform relationships to Cypher (make use of known Nodes) + #2.4 transform node to Cypher +# 3. put relation and node strings together to create one big Cypher query +# 4. return query and maybe the parameter def parse_json_to_cypher(data_list) : - global nodes_count + #global nodes_count + + #if nodes_count % 10000 == 0: + # print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) - if nodes_count % 10000 == 0: - print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) - - - #nodes_count = nodes_count - #knownNodes = dict() #to check if Node is known and no Lookup is needed batchData = batchDataHolder() - all_values = {} + all_values = {}# Dictionary holds the key, written in the query and the value to be inserted + for input_string in data_list: # run throuh subgraphs of the Batch - nodes_count = nodes_count +1 + #nodes_count = nodes_count +1 input = json.loads(input_string) jsonType = list(input['datum'].keys())[0] # short type string nodeType = jsonType.rsplit(".", 1)[1] # data of object - value = input["datum"][jsonType] + value = input["datum"][jsonType] # value are the properties of a node, but should not be nested + + value_flat = {} # value_flat are the properties of a node, not nested + flatten_obj("",value,value_flat) # creates non nested properties - value_flat = {} # TODO value_flat sind die Attribute eines Knotens, jedoch müssen alle Attribute aller neuen Knoten eingefügt werden - flatten_obj("",value,value_flat) - # newIdentifier is the central node of the current Subgraph, each line of the Batch is own new Subgraph + # newIdentifier is the alias of the new node in the query newIdentifier = batchData.get_Identifier_Increment_add_Node('_', value_flat['uuid']) - # makes sure in with are differen identifier used (one numbercount for al kind of nodes) + all_values[newIdentifier] = value_flat relations = dict( @@ -239,7 +251,7 @@ def parse_json_to_cypher(data_list) : - # lookup existing nodes for relations + # relationships to cypher for rel in relations.keys(): val = relations[rel] if val != '': diff --git a/code/infrastructure/streaming/clients/sub/memgraph/kind_oriented/sub_mem_whole_batch_kind_oriented_two_label_internal_collision_handle.py b/code/infrastructure/streaming/clients/sub/memgraph/kind_oriented/sub_mem_whole_batch_kind_oriented_two_label_internal_collision_handle.py new file mode 100644 index 0000000000000000000000000000000000000000..b2eba2663b81c7484a822d19a30e234a8019da21 --- /dev/null +++ b/code/infrastructure/streaming/clients/sub/memgraph/kind_oriented/sub_mem_whole_batch_kind_oriented_two_label_internal_collision_handle.py @@ -0,0 +1,429 @@ +import datetime +import paho.mqtt.client as mqtt +import time +import json +from neo4j import GraphDatabase +import os + +# Muster +# OPTIONAL MATCH all lookups (not known yet) OPTIONAL to get errors +# CREATE all nodes (Using Param) +# CREATE all EDGES + + +broker_hostname=str(os.getenv('mos_host',default="localhost")) +broker_port = int(os.getenv('mos_port',default=1883)) +client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"memgraph") +db_uri =str(os.getenv('mem_host',default="bolt://localhost:8687")) #local test Port 8687 not local 7687 +neo4j_auth=("","") +abort_time_limit = int(os.getenv('abort_time_limit', default=99999)) +create_indices = True #os.getenv('create_indices', 'False').lower() in ['true', '1', 't', 'y', 'yes'] +on_disk = False +second_label = 'Node' +analytic = False + +# flatten_obj receives a complex object val and reduces it into a one-level deep dictionary. +# The reduced dictionary is written into the target +def flatten_obj(key, val, target): + complexType = type(val) + if val is None: + return None # Fehler weshalb es zur Diskrepanz kommt + elif complexType is dict: + for k, v in val.items(): + if "com.bbn.tc.schema.avro.cdm18.UUID" not in k: # node for Edge not needed as Attribute + new_key = f"{key}_{k}" if key else k + flatten_obj(new_key, v, target) + elif complexType is list: + for i in range(len(val)): + v = val[i] + new_key = f"{key}_{i}" if key else str(i) + flatten_obj(new_key, v, target) + elif complexType is object: + for k, v in val.__dict__.items(): + new_key = f"{key}_{k}" if key else k + flatten_obj(new_key, v, target) + else: + if "properties_map_arg_mem_flags" in key: + # String to list and make separate Attribute Values pairs for the Objects of list + values_list = eval(val) + cleaned_values = [value.strip("'") for value in values_list] + + index = 0 + for value in cleaned_values: + index += 1 + target[f"{key}_{index}"] = value + else: + target[key] = val + + + + + +# batchDataHolder: holds Data for the whole Batch and manages following variables +# knownNodes: holds a dictionary with all known Nodes, uses uuid as key and identifier as value +# create_Nodes: holds a list of Cypher string parts to create the new nodes +# identifierNumber: holds a number to create unique identifier +# lookup_nodes: holds a list of Cypher string parts to lookup nodes +# insert_relations: holds a list of Cypher string parts to create edges + +# The get method of each list variable creates a whole String from the parts, these Strings must append a command +# in Cypher MATCH, MERGE, CREATE and so on +class batchDataHolder: + def __init__(self): + + self.knownNodes = dict() + self.create_Nodes = [] + self.identifierNumber = 0 #number to create unique identifier in the query + self.lookup_nodes = [] + self.insert_relations = [] + + def add_entry_Node(self, key, value): + self.knownNodes[key] = value + + def get_lookup_nodes(self): + return ', '.join(self.lookup_nodes) + + def get_insert_relations(self): + return ', '.join(self.insert_relations) + + def is_empty_lookup_nodes(self): + if self.lookup_nodes: + return False + else: + return True + + def is_empty_insert_relations(self): + if self.insert_relations: + return False + else: + return True + + + def get_knownNodes(self): + return self.knownNodes + + def get_Node_value(self, key): + return self.knownNodes[key] + + def check_key(self, key): + if key in self.knownNodes: + return True + else: + return False + + + def append_with(self, ident): + self.with_list_identifier.append(ident) + + def get_Identifier_Increment_add_Node(self, type, key):#create a new identifier + if self.check_key(key): #Try to make sure that no collision in one Query is possible only new part 113-121 + for i, value in enumerate(self.lookup_nodes): + if key in value: + del self.lookup_nodes[i] #delete node from the Match clause + + identifier= self.get_Node_value(key) #use same identifier + return identifier + + else : + identifier = type + str(self.identifierNumber) + self.identifierNumber += 1 + self.add_entry_Node(key, identifier) + return identifier + + def append_create_Nodes(self, add): + self.create_Nodes.append(add) + + def get_create_Nodes(self): + return ', '.join(self.create_Nodes) + + # Check if target Node is already known, if not create a lookup. + # use the new or old identifier and add edge String. + def nodes_relations_to_cypher( self, val, newIdentifier, rel, type_letter, type): #type either ':Host' 'Subject' .... or '' + try: + if self.check_key(val): # Node is already known use the same identifier + ident = self.get_Node_value(val) + self.insert_relations.append(f'''({newIdentifier}) -[:{rel}]-> ({ident})''') + else : + ident = self.get_Identifier_Increment_add_Node(type_letter, val) # get identifier and add to knownNode + s = f'({ident}{type} {{uuid: "{val}"}})' + self.lookup_nodes.append(s) + self.insert_relations.append(f'''({newIdentifier}) -[:{rel}]-> ({ident})''') + except: + print(type_letter) + + +#nodes_count = 0 + +#parse_json_to_cypher: first create Variables for the whole message +# 2. Read every create node and its edges + #2.1 load string as json and transform the node properties to non nested value_flat + #2.2 create relationships from the json + #2.3 transform relationships to Cypher (make use of known Nodes) + #2.4 transform node to Cypher +# 3. put relation and node strings together to create one big Cypher query +# 4. return query and maybe the parameter +def parse_json_to_cypher(data_list) : + #global nodes_count + + #if nodes_count % 10000 == 0: + # print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) + + batchData = batchDataHolder() + all_values = {}# Dictionary holds the key, written in the query and the value to be inserted + + for input_string in data_list: # run throuh subgraphs of the Batch + + #nodes_count = nodes_count +1 + + + input = json.loads(input_string) + jsonType = list(input['datum'].keys())[0] + # short type string + nodeType = jsonType.rsplit(".", 1)[1] + # data of object + value = input["datum"][jsonType] # value are the properties of a node, but should not be nested + + value_flat = {} # value_flat are the properties of a node, not nested + flatten_obj("",value,value_flat) # creates non nested properties + + # newIdentifier is the alias of the new node in the query + newIdentifier = batchData.get_Identifier_Increment_add_Node('_', value_flat['uuid']) + + + all_values[newIdentifier] = value_flat + + relations = dict( + runsOn="" + ,isGeneratedBy="" + ,affects="" + ,affects2="" + ,residesOn="" + ,isPartOf="" # not in data set + ,hasOwningPricipal="" + ,hasTag="" # not in data set + ,hasParent="" + ,hasAccountOn="" + ,hasLocalPrincipal="" + ) + + + + + # create relationships + try: + if nodeType == 'Subject': + if value['parentSubject'] != None: + relations.update({'hasParent':value['parentSubject']["com.bbn.tc.schema.avro.cdm18.UUID"]}) + if value['hostId'] != None: + relations.update({'runsOn':value['hostId']}) + if value['localPrincipal'] != None: + relations.update({'hasLocalPrincipal':value['localPrincipal']}) + # the relationship for subject -[affects]-> event is missing... probably implicit through is generated by + + elif nodeType == 'FileObject': + if value['baseObject'] != None: + relations.update({"residesOn":value['baseObject']['hostId']}) + # relations.update({"isPartOf":}) + if value['localPrincipal']: + relations.update({"hasOwningPrincipal":value['localPrincipal']}) + # relations.update({"hasTag":}) + + # create relationships for host id + # mapping of cdm fields to relationships of nodes + elif nodeType == 'Event': + # if value['hostId'] != None: + # relations.update({'runsOn':value['hostId']}) + if value['subject'] != None: + relations.update({'isGeneratedBy':value['subject']['com.bbn.tc.schema.avro.cdm18.UUID']}) + if value['predicateObject'] != None: + relations.update({'affects':value['predicateObject']['com.bbn.tc.schema.avro.cdm18.UUID']}) + if value['predicateObject2'] != None: + relations.update({'affects2':value['predicateObject2']['com.bbn.tc.schema.avro.cdm18.UUID']}) + + elif nodeType == 'Principal': + if value['hostId'] != None: + relations.update({'hasAccountOn':value['hostId']}) + + elif nodeType == 'UnnamedPipeObject': + if value['sourceUUID'] != None: + relations.update({'affects':value['sourceUUID']['com.bbn.tc.schema.avro.cdm18.UUID']}) + if value['sinkUUID'] != None: + relations.update({'affects2':value["sinkUUID"]["com.bbn.tc.schema.avro.cdm18.UUID"]}) + if value['baseObject'] != None: + relations.update({'residesOn':value['baseObject']['hostId']}) + + elif nodeType == 'NetFlowObject': + if value['baseObject'] != None: + relations.update({'residesOn':value['baseObject']['hostId']}) + + elif nodeType == 'SrcSinkObject': + if value['baseObject'] != None: + relations.update({'residesOn':value['baseObject']['hostId']}) + + + + + # relationships to cypher + for rel in relations.keys(): + val = relations[rel] + if val != '': + + + + if rel =="residesOn": + batchData.nodes_relations_to_cypher(val, newIdentifier, rel, 'he', ':Host') + + elif rel == "runsOn": + batchData.nodes_relations_to_cypher(val, newIdentifier, rel, 'hu', ':Host') + + elif rel =="isGeneratedBy": + batchData.nodes_relations_to_cypher(val, newIdentifier, rel, 's', f':Subject:{second_label}') + + elif rel =="hasOwningPrincipal": + batchData.nodes_relations_to_cypher(val, newIdentifier, rel, 'p', ':Principal') + + elif rel =="affects": + batchData.nodes_relations_to_cypher(val, newIdentifier, rel, 'a', f':{second_label}') + + elif rel == 'affects2': + batchData.nodes_relations_to_cypher(val, newIdentifier, rel, 'a', f':{second_label}') + # ... other relations for Object not in data + elif rel =="hasParent": + batchData.nodes_relations_to_cypher(val, newIdentifier, rel, 'not', f':{second_label}') + + elif rel =='hasAccountOn': + batchData.nodes_relations_to_cypher(val, newIdentifier, rel, 'not', f':{second_label}') + + elif rel == 'hasLocalPrincipal': + batchData.nodes_relations_to_cypher(val, newIdentifier, rel, 'not', f':{second_label}') + + batchData.append_create_Nodes(f"""({newIdentifier}:{nodeType}:{second_label} ${newIdentifier})""") + except: + print('Exception') + print('input: ', input) + print('relations: ', relations) + break + + + + if batchData.is_empty_lookup_nodes(): # No Lookup (lookup needed = edges are being created) + if batchData.is_empty_insert_relations(): # No Lookup, maybe no edges, most likely edges between the created nodes + query = f""" CREATE {batchData.get_create_Nodes()}""" + else: + query = f""" CREATE {batchData.get_create_Nodes()} CREATE {batchData.get_insert_relations()}""" + else: # Most likely lookup needed and edges being created + query = f"""OPTIONAL MATCH {batchData.get_lookup_nodes()} CREATE {batchData.get_create_Nodes()} CREATE {batchData.get_insert_relations()}""" + + # print() + # print('query: ', query) + # print() + # print('attributes: ', all_values) + # print(nodes_count) + return query, all_values + + +def create_cypher_query_from_cdm(json): + ''' + Create Cypher Queries from publisher message + ''' + query, value = parse_json_to_cypher(json) + return query, value + +def on_message(client, userdata, message): + ''' + The callback function for message listener + ''' + data = json.loads(message.payload.decode("utf-8")) + print("Received message from: ",message.topic) + q, value = create_cypher_query_from_cdm(data) + execute_query(q, value) + +def on_connect(client, userdata, flags, return_code): + ''' + Connecting and subscribing to the Mosquitto topic + ''' + if return_code == 0: + print("connected") + client.subscribe("neo4j",qos=1) + else: + print("could not connect, return code:", return_code) + client.failed_connect = True + +def connect_to_db(uri,auth): + ''' + Establish db connection to neo4j + ''' + driver = GraphDatabase.driver(uri, auth=auth) + with driver.session() as session: + print("Cleanup existing data...") + session.run("MATCH (n) detach delete n") + session.run("RETURN 1 as result") + print("Successfully connected to DB...") + + if(on_disk): + session.run("STORAGE MODE ON_DISK_TRANSACTIONAL") + if (analytic): + session.run("STORAGE MODE IN_MEMORY_ANALYTICAL;") + # create indices here .... + if (create_indices): + s = f"CREATE INDEX ON :{second_label}(uuid);" + session.run(s) + session.run("CREATE INDEX ON :Subject(uuid);") + session.run("CREATE INDEX ON :Event(uuid);") + session.run("CREATE INDEX ON :Host(uuid);") + session.run("CREATE INDEX ON :FileObject(uuid);") + session.run("CREATE INDEX ON :NetFlowObject(uuid);") + session.run("CREATE INDEX ON :SrcSinkObject(uuid);") + session.run("CREATE INDEX ON :UnnamedPipeObject(uuid);") + session.run("CREATE INDEX ON :Principal(uuid);") + + return driver + + +def execute_query(query:str, value): + ''' + Execute any Neo4j Query. + + Expected Query Parameter: + query = Query String, + attributes = atttributes to be inserted + ''' + #print(query) + print("Before sending: ", datetime.datetime.now().strftime("%H:%M:%S")) + with driver.session() as session: + result = session.run(query, value) + try: + summary = result.consume()# makes sure that the query was run by the Databases + return summary + except Exception as e: + print(f"An error occurred: {e}") + + print("After sending: ", datetime.datetime.now().strftime("%H:%M:%S")) + # with driver.session() as session: + # result = session.run(query, value) # maybe result = session.run(query, **value) + return None + +driver = connect_to_db(db_uri,neo4j_auth) + +# client.username_pw_set(username="user_name", password="password") # uncomment if you use password auth +client.on_connect = on_connect +client.on_message = on_message +client.failed_connect = False + +client.connect(broker_hostname, broker_port,keepalive=3600*4) +client.loop_start() + +# this try-finally block ensures that whenever we terminate the program earlier by hitting ctrl+c, it still gracefully exits +try: + i = 0 + while i < abort_time_limit: #and client.failed_connect == False: + time.sleep(1) + i += 1 + if client.failed_connect == True: + print('Connection failed, exiting...') + +finally: + client.disconnect() + client.loop_stop() + driver.close() + diff --git a/code/infrastructure/streaming/clients/sub/memgraph/sub_mem_original_sub_used_as_a_basis.py b/code/infrastructure/streaming/clients/sub/memgraph/sub_mem_original_sub_used_as_a_basis.py new file mode 100644 index 0000000000000000000000000000000000000000..49961d1140a6e52342a192ec3890bd3b340762f9 --- /dev/null +++ b/code/infrastructure/streaming/clients/sub/memgraph/sub_mem_original_sub_used_as_a_basis.py @@ -0,0 +1,280 @@ +import paho.mqtt.client as mqtt +import time +import json +from neo4j import GraphDatabase +import os + +broker_hostname=str(os.getenv('mos_host',default="localhost")) +broker_port = int(os.getenv('mos_port',default=1883)) +client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1,"memgraph") +db_uri =str(os.getenv('mem_host',default="bolt://localhost:8687")) # maybe change back to 7687 +neo4j_auth=("","") +abort_time_limit = int(os.getenv('abort_time_limit', default=99999)) +create_indices = os.getenv('create_indices', 'False').lower() in ['true', '1', 't', 'y', 'yes'] + +def flatten_obj(key, val, target): # Edited in other Versions so the names doesn't start with _ + complexType = type(val) + if val is None: + return None + elif complexType is dict: + for k, v in val.items(): + flatten_obj(key + "_" + k, v, target) + elif complexType is list: + for i in range(len(val)): + v = val[i] + flatten_obj(key + "_" + str(i), v, target) + elif complexType is object: + for k, v in val.__dict__.items(): + flatten_obj(key + "_" + k, v, target) + else: + target[key] = val + +def parse_json_to_cypher(input) : #receives a message with one node. changed to multiple nodes per message + jsonType = list(input['datum'].keys())[0] + # short type string + nodeType = jsonType.rsplit(".", 1)[1] + # data of object + value = input["datum"][jsonType] + + relations = dict( + runsOn="" + ,isGeneratedBy="" + ,affects="" + ,affects2="" + ,residesOn="" + ,isPartOf="" # not in data set + ,hasOwningPricipal="" + ,hasTag="" # not in data set + ,hasParent="" + ,hasAccountOn="" + ,hasLocalPrincipal="" + ) + + + # create relationships + try: + if nodeType == 'Subject': + if value['parentSubject'] != None: + relations.update({'hasParent':value['parentSubject']["com.bbn.tc.schema.avro.cdm18.UUID"]}) + if value['hostId'] != None: + relations.update({'runsOn':value['hostId']}) + if value['localPrincipal'] != None: + relations.update({'hasLocalPrincipal':value['localPrincipal']}) + # the relationship for subject -[affects]-> event is missing... probably implicit through is generated by + + if nodeType == 'FileObject': + if value['baseObject'] != None: + relations.update({"residesOn":value['baseObject']['hostId']}) + # relations.update({"isPartOf":}) + if value['localPrincipal']: + relations.update({"hasOwningPrincipal":value['localPrincipal']}) + # relations.update({"hasTag":}) + + # create relationships for host id + # mapping of cdm fields to relationships of nodes + if nodeType == 'Event': + # if value['hostId'] != None: + # relations.update({'runsOn':value['hostId']}) + if value['subject'] != None: + relations.update({'isGeneratedBy':value['subject']['com.bbn.tc.schema.avro.cdm18.UUID']}) + if value['predicateObject'] != None: + relations.update({'affects':value['predicateObject']['com.bbn.tc.schema.avro.cdm18.UUID']}) + if value['predicateObject2'] != None: + relations.update({'affects2':value['predicateObject2']['com.bbn.tc.schema.avro.cdm18.UUID']}) + + if nodeType == 'Principal': + if value['hostId'] != None: + relations.update({'hasAccountOn':value['hostId']}) + + if nodeType == 'UnnamedPipeObject': + if value['sourceUUID'] != None: + relations.update({'affects':value['sourceUUID']['com.bbn.tc.schema.avro.cdm18.UUID']}) + if value['sinkUUID'] != None: + relations.update({'affects2':value["sinkUUID"]["com.bbn.tc.schema.avro.cdm18.UUID"]}) + if value['baseObject'] != None: + relations.update({'residesOn':value['baseObject']['hostId']}) + + if nodeType == 'NetFlowObject': + if value['baseObject'] != None: + relations.update({'residesOn':value['baseObject']['hostId']}) + + if nodeType == 'SrcSinkObject': + if value['baseObject'] != None: + relations.update({'residesOn':value['baseObject']['hostId']}) + + q_rel = '' + insert_strings = [] + # lookup existing nodes for relations + for rel in relations.keys(): + val = relations[rel] + if val != '': + if rel =="residesOn": + s = f''' + WITH new + MATCH (existing:Host) where existing._uuid = "{val}" + CREATE (new) -[:{rel}]-> (existing) + ''' + insert_strings.append(s) + + if rel == "runsOn": + s = f''' + WITH new + MATCH (existing:Host) where existing._uuid = "{val}" + CREATE (new) -[:{rel}]-> (existing) + ''' + insert_strings.append(s) + if rel =="isGeneratedBy": + s = f''' + WITH new + MATCH (existing:Subject) where existing._uuid = "{val}" + CREATE (new) -[:{rel}]-> (existing) + ''' + insert_strings.append(s) + if rel =="hasOwningPrincipal": + s = f''' + WITH new + MATCH (existing:Principal) where existing._uuid = "{val}" + CREATE (new) -[:{rel}]-> (existing) + ''' + insert_strings.append(s) + if rel =="affects": + s = f''' + WITH new + MATCH (existing) where existing._uuid = "{val}" + CREATE (new) -[:{rel}]-> (existing) + ''' + insert_strings.append(s) + if rel == 'affects2': + s = f''' + WITH new + MATCH (existing) where existing._uuid = "{val}" + CREATE (new) -[:{rel}]-> (existing) + ''' + insert_strings.append(s) + # ... other relations for Object not in data + if rel =="hasParent": + s = f''' + WITH new + MATCH (existing) where existing._uuid = "{val}" + CREATE (new) -[:{rel}]-> (existing) + ''' + insert_strings.append(s) + if rel =='hasAccountOn': + s = f''' + WITH new + MATCH (existing) where existing._uuid = "{val}" + CREATE (new) -[:{rel}]-> (existing) + ''' + insert_strings.append(s) + + if rel == 'hasLocalPrincipal': + s = f''' + WITH new + MATCH (existing) where existing._uuid = "{val}" + CREATE (new) -[:{rel}]-> (existing) + ''' + insert_strings.append(s) + q_rel = "".join(insert_strings) + value_flat = {} + flatten_obj("",value,value_flat) + + query = f""" + CREATE (new:{nodeType} $attributes) + {q_rel} + RETURN new + """ + return query,value_flat + except: + print('input: ', input) + print('relations: ', relations) + + +def create_cypher_query_from_cdm(json): + ''' + Create Cypher Queries from publisher message + ''' + query,value = parse_json_to_cypher(json) + return query, value + +def on_message(client, userdata, message): + ''' + The callback function for message listener + ''' + data = json.loads(message.payload.decode("utf-8")) + print("Received message from: ",message.topic) + q,attr = create_cypher_query_from_cdm(data) + # print(q) + execute_query(q,attr) + +def on_connect(client, userdata, flags, return_code): + ''' + Connecting and subscribing to the Mosquitto topic + ''' + if return_code == 0: + print("connected") + client.subscribe("neo4j",qos=1) + else: + print("could not connect, return code:", return_code) + client.failed_connect = True + +def connect_to_db(uri,auth): + ''' + Establish db connection to neo4j + ''' + driver = GraphDatabase.driver(uri, auth=auth) + with driver.session() as session: + print("Cleanup existing data...") + session.run("MATCH (n) detach delete n") + session.run("RETURN 1 as result") + print("Successfully connected to DB...") + + # create indices here .... + if (create_indices): + session.run("CREATE INDEX ON :Subject(_uuid);") + session.run("CREATE INDEX ON :Event(_uuid);") + session.run("CREATE INDEX ON :Host(_uuid);") + session.run("CREATE INDEX ON :FileObject(_uuid);") + session.run("CREATE INDEX ON :NetFlowObject(_uuid);") + session.run("CREATE INDEX ON :SrcSinkObject(_uuid);") + session.run("CREATE INDEX ON :UnnamedPipeObject(_uuid);") + session.run("CREATE INDEX ON :Principal(_uuid);") + + return driver + + +def execute_query(query:str, attributes): + ''' + Execute any Neo4j Query. + + Expected Query Parameter: + query = Query String, + attributes = atttributes to be inserted + ''' + with driver.session() as session: + result = session.run(query,attributes=attributes) + return result.data() + +driver = connect_to_db(db_uri,neo4j_auth) + +# client.username_pw_set(username="user_name", password="password") # uncomment if you use password auth +client.on_connect = on_connect +client.on_message = on_message +client.failed_connect = False + +client.connect(broker_hostname, broker_port,keepalive=3600*4) +client.loop_start() + +# this try-finally block ensures that whenever we terminate the program earlier by hitting ctrl+c, it still gracefully exits +try: + i = 0 + while i < abort_time_limit: #and client.failed_connect == False: + time.sleep(1) + i += 1 + if client.failed_connect == True: + print('Connection failed, exiting...') + +finally: + client.disconnect() + client.loop_stop() + driver.close() + diff --git a/code/infrastructure/streaming/clients/sub/memgraph/subgraph_oriented/sub_mem_whole_batch_subgraph_oriented.py b/code/infrastructure/streaming/clients/sub/memgraph/subgraph_oriented/sub_mem_whole_batch_subgraph_oriented.py index 7c4c6d9780e8dd5edbbf11669d2ada896dc29759..cb270290614227bbf85e7d519c657b3f467b582a 100644 --- a/code/infrastructure/streaming/clients/sub/memgraph/subgraph_oriented/sub_mem_whole_batch_subgraph_oriented.py +++ b/code/infrastructure/streaming/clients/sub/memgraph/subgraph_oriented/sub_mem_whole_batch_subgraph_oriented.py @@ -8,7 +8,7 @@ import os #Muster # MATCH lookupnodes (not known yet) # CREATE current Node (not using PARAM) -# CREATE edges +# CREATE edges of current Node # WITH # repeat next node @@ -21,6 +21,8 @@ abort_time_limit = int(os.getenv('abort_time_limit', default=99999)) create_indices = True #os.getenv('create_indices', 'False').lower() in ['true', '1', 't', 'y', 'yes'] analytic = False +# flatten_obj receives a complex object val and reduces it into a one-level deep dictonary. +# The reduced dictonary is written into the target def flatten_obj(key, val, target): complexType = type(val) if val is None: @@ -53,14 +55,18 @@ def flatten_obj(key, val, target): target[key] = val -# Holds a list with the Nodes to match and a list with relations to add -# manages the build of these lists +# subgraphNodeRelation: Holds Data for each separated Query, for each node creation +# lookup_nodes: Holds a list of Nodes to match +# insert_relations: Holds the Cypher string parts to create the edges +# the get method of these lists connects them to a single String separated with commas class subgraphNodeRelation: def __init__(self): self.lookup_nodes = [] self.insert_relations = [] + # Check if target Node is already known, if not create a lookup. + # use the new or old identifier and add edge String. def nodes_relations_to_cypher( self, val, newIdentifier, rel, batchData, type_letter, type): #type either ':Host' 'Subject' .... or '' try: if batchData.check_key(val): # Node is already known use the same identifier @@ -93,10 +99,11 @@ class subgraphNodeRelation: return True -# Data for the whole Batch, so it doesnt get copied when used in another function -# Holds a List with all known identifier, a directory to look up if the Node is already known -# Holds the Batch query -# Holds a number to create diffrent idetifier +# batchDataHolder: holds Data for the whole Batch +# knownNodes: Holds a list with all known identifier as a dictonary to look up if the Node is already known +# with_list_identifier: Holds a list of identifiers, used to append a WITH at batch_query with all identifiers +# batch_query: Holds a list of Queries, separated by the usage of the Cypher connection of Strings with WITH +# identifierNumber: Holds a number to create diffrent idetifier class batchDataHolder: def __init__(self): self.knownNodes = dict() @@ -144,21 +151,28 @@ class batchDataHolder: self.batch_query.append(' WITH ' + ', '.join(self.with_list_identifier) + ' ') # append the with identifier transform to string split by ', ' -nodes_count = 0 +#nodes_count = 0 +#parse_json_to_cypher: first create Variables for the whole message +# 2. Read every create node and its edges + #2.0 if not first node add WITH query part as element to make usage of known nodes possible + #2.1 load string as json and transform the node properties to non nested value_flat + #2.2 create relationships from the json + #2.3 transform relationships and the new Node to Cypher query (make use of known nodes) +# 3. put the string (query) parts (subgraph, WITH, subgraph ....) together +# 4. return query and maybe the parameter def parse_json_to_cypher(data_list) : - global nodes_count + #global nodes_count #knownNodes = dict() #to check if Node is known and no Lookup is needed batchData = batchDataHolder() - if nodes_count % 5000 == 0: - print(nodes_count) - print(datetime.datetime.now().strftime("%H:%M:%S")) + #if nodes_count % 5000 == 0: + # print(nodes_count) + # print(datetime.datetime.now().strftime("%H:%M:%S")) for input_string in data_list: # run throuh subgraphs of the Batch - #print(nodes_count) - nodes_count = nodes_count +1 + #nodes_count = nodes_count +1 batchData.add_query_with() # add Nodes from Subgraph before, if not first @@ -251,7 +265,7 @@ def parse_json_to_cypher(data_list) : subgraph = subgraphNodeRelation() - # lookup existing nodes for relations + # relationships to cypher for rel in relations.keys(): val = relations[rel] if val != '': @@ -379,10 +393,10 @@ def connect_to_db(uri,auth): session.run("RETURN 1 as result") print("Successfully connected to DB...") + if (analytic): + session.run("STORAGE MODE IN_MEMORY_ANALYTICAL;") # create indices here .... if (create_indices): - if (analytic): - session.run("STORAGE MODE IN_MEMORY_ANALYTICAL;") session.run("CREATE INDEX ON :Subject(uuid);") session.run("CREATE INDEX ON :Event(uuid);") session.run("CREATE INDEX ON :Host(uuid);") @@ -403,12 +417,10 @@ def execute_query(query:str): query = Query String, attributes = atttributes to be inserted ''' - #print(query) - - global nodes_count + if '\\' in query: - # Ersetze jeden `\` durch `\\` in der Zeichenkette + # replace \ with \\ so it doesn't get falsly interpreted query = query.replace("\\", "\\\\") diff --git a/code/infrastructure/streaming/clients/sub/memgraph/subgraph_oriented/sub_mem_whole_batch_subgraph_oriented_using_Params.py b/code/infrastructure/streaming/clients/sub/memgraph/subgraph_oriented/sub_mem_whole_batch_subgraph_oriented_using_Params.py index 1df83d17eb47f6f919d8ce607d1bc6dcc2dadc3f..c72603a09397f924f226e921453476545e4d7255 100644 --- a/code/infrastructure/streaming/clients/sub/memgraph/subgraph_oriented/sub_mem_whole_batch_subgraph_oriented_using_Params.py +++ b/code/infrastructure/streaming/clients/sub/memgraph/subgraph_oriented/sub_mem_whole_batch_subgraph_oriented_using_Params.py @@ -9,7 +9,7 @@ import os #Muster # MATCH lookupnodes (not known yet) # CREATE current Node (using PARAM) -# CREATE edges +# CREATE edges of current Node # WITH # repeat next node @@ -22,6 +22,8 @@ abort_time_limit = int(os.getenv('abort_time_limit', default=99999)) create_indices = True #os.getenv('create_indices', 'False').lower() in ['true', '1', 't', 'y', 'yes'] analytic = False +# flatten_obj receives a complex object val and reduces it into a one-level deep dictonary. +# The reduced dictonary is written into the target def flatten_obj(key, val, target): complexType = type(val) if val is None: @@ -54,14 +56,18 @@ def flatten_obj(key, val, target): target[key] = val -# Holds a list with the Nodes to match and a list with relations to add -# manages the build of these lists +# subgraphNodeRelation: Holds Data for each separated Query, for each node creation +# lookup_nodes: Holds a list of Nodes to match +# insert_relations: Holds the Cypher string parts to create the edges +# the get method of these lists connects them to a single String separated with commas class subgraphNodeRelation: def __init__(self): self.lookup_nodes = [] self.insert_relations = [] + # Check if target Node is already known, if not create a lookup. + # use the new or old identifier and add edge String. def nodes_relations_to_cypher( self, val, newIdentifier, rel, batchData, type_letter, type): #type either ':Host' 'Subject' .... or '' try: if batchData.check_key(val): # Node is already known use the same identifier @@ -94,10 +100,11 @@ class subgraphNodeRelation: return True -# Data for the whole Batch, so it doesnt get copied when used in another function -# Holds a List with all known identifier, a directory to look up if the Node is already known -# Holds the Batch query -# Holds a number to create diffrent idetifier +# batchDataHolder: holds Data for the whole Batch +# knownNodes: Holds a list with all known identifier as a dictonary to look up if the Node is already known +# with_list_identifier: Holds a list of identifiers, used to append a WITH at batch_query with all identifiers +# batch_query: Holds a list of Queries, separated by the usage of the Cypher connection of Strings with WITH +# identifierNumber: Holds a number to create diffrent idetifier class batchDataHolder: def __init__(self): self.knownNodes = dict() @@ -145,23 +152,29 @@ class batchDataHolder: self.batch_query.append(' WITH ' + ', '.join(self.with_list_identifier) + ' ') # append the with identifier transform to string split by ', ' -nodes_count = 0 +#nodes_count = 0 +#parse_json_to_cypher: first create Variables for the whole message +# 2. Read every create node and its edges + #2.0 if not first node add WITH query part as element to make usage of known nodes possible + #2.1 load string as json and transform the node properties to non nested value_flat + #2.2 create relationships from the json + #2.3 transform relationships and the new Node to Cypher query (make use of known nodes) +# 3. put the string (query) parts (subgraph, WITH, subgraph ....) together +# 4. return query and maybe the parameter def parse_json_to_cypher(data_list) : - global nodes_count + #global nodes_count - - #knownNodes = dict() #to check if Node is known and no Lookup is needed batchData = batchDataHolder() - all_values = {} + all_values = {}# Dictionary holds the key, written in the query and the value to be inserted for input_string in data_list: # run throuh subgraphs of the Batch - if nodes_count % 5000 == 0: - print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) + #if nodes_count % 5000 == 0: + # print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) - nodes_count = nodes_count +1 + #nodes_count = nodes_count +1 batchData.add_query_with() # add Nodes from Subgraph before, if not first @@ -253,7 +266,7 @@ def parse_json_to_cypher(data_list) : subgraph = subgraphNodeRelation() - # lookup existing nodes for relations + # relationships to cypher for rel in relations.keys(): val = relations[rel] if val != '': diff --git a/code/infrastructure/streaming/clients/sub/memgraph/subgraph_oriented/sub_mem_whole_batch_subgraph_oriented_using_Params_two_label.py b/code/infrastructure/streaming/clients/sub/memgraph/subgraph_oriented/sub_mem_whole_batch_subgraph_oriented_using_Params_two_label.py index 1bc12552f6428b6fbdd6b98a31796b500dd8b7d0..6f820a0b06fb65c06e258257b49e8cbd50981487 100644 --- a/code/infrastructure/streaming/clients/sub/memgraph/subgraph_oriented/sub_mem_whole_batch_subgraph_oriented_using_Params_two_label.py +++ b/code/infrastructure/streaming/clients/sub/memgraph/subgraph_oriented/sub_mem_whole_batch_subgraph_oriented_using_Params_two_label.py @@ -9,7 +9,7 @@ import os #Muster # MATCH lookupnodes (not known yet) # CREATE current Node (using PARAM) -# CREATE edges +# CREATE edges of current Node # WITH # repeat next node @@ -22,6 +22,8 @@ abort_time_limit = int(os.getenv('abort_time_limit', default=99999)) create_indices = False #os.getenv('create_indices', 'False').lower() in ['true', '1', 't', 'y', 'yes'] second_label = 'Node' +# flatten_obj receives a complex object val and reduces it into a one-level deep dictonary. +# The reduced dictonary is written into the target def flatten_obj(key, val, target): complexType = type(val) if val is None: @@ -53,15 +55,18 @@ def flatten_obj(key, val, target): else: target[key] = val - -# Holds a list with the Nodes to match and a list with relations to add -# manages the build of these lists +# subgraphNodeRelation: Holds Data for each separated Query, for each node creation +# lookup_nodes: Holds a list of Nodes to match +# insert_relations: Holds the Cypher string parts to create the edges +# the get method of these lists connects them to a single String separated with commas class subgraphNodeRelation: def __init__(self): self.lookup_nodes = [] self.insert_relations = [] + # Check if target Node is already known, if not create a lookup. + # use the new or old identifier and add edge String. def nodes_relations_to_cypher( self, val, newIdentifier, rel, batchData, type_letter, type): #type either ':Host' 'Subject' .... or '' try: if batchData.check_key(val): # Node is already known use the same identifier @@ -94,10 +99,11 @@ class subgraphNodeRelation: return True -# Data for the whole Batch, so it doesnt get copied when used in another function -# Holds a List with all known identifier, a directory to look up if the Node is already known -# Holds the Batch query -# Holds a number to create diffrent idetifier +# batchDataHolder: holds Data for the whole Batch +# knownNodes: Holds a list with all known identifier as a dictonary to look up if the Node is already known +# with_list_identifier: Holds a list of identifiers, used to append a WITH at batch_query with all identifiers +# batch_query: Holds a list of Queries, separated by the usage of the Cypher connection of Strings with WITH +# identifierNumber: Holds a number to create diffrent idetifier class batchDataHolder: def __init__(self): self.knownNodes = dict() @@ -145,22 +151,29 @@ class batchDataHolder: self.batch_query.append(' WITH ' + ', '.join(self.with_list_identifier) + ' ') # append the with identifier transform to string split by ', ' -nodes_count = 0 +#nodes_count = 0 +#parse_json_to_cypher: first create Variables for the whole message +# 2. Read every create node and its edges + #2.0 if not first node add WITH query part as element to make usage of known nodes possible + #2.1 load string as json and transform the node properties to non nested value_flat + #2.2 create relationships from the json + #2.3 transform relationships and the new Node to Cypher query (make use of known nodes) +# 3. put the string (query) parts (subgraph, WITH, subgraph ....) together +# 4. return query and maybe the parameter def parse_json_to_cypher(data_list) : - global nodes_count - if nodes_count % 10000 == 0: - print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) + #global nodes_count + #if nodes_count % 10000 == 0: + # print("Nodes: ", nodes_count, " Time: ", datetime.datetime.now().strftime("%H:%M:%S")) - #knownNodes = dict() #to check if Node is known and no Lookup is needed batchData = batchDataHolder() - all_values = {} + all_values = {}# Dictionary holds the key, written in the query and the value to be inserted for input_string in data_list: # run throuh subgraphs of the Batch - nodes_count = nodes_count +1 + #nodes_count = nodes_count +1 batchData.add_query_with() # add Nodes from Subgraph before, if not first @@ -252,7 +265,7 @@ def parse_json_to_cypher(data_list) : subgraph = subgraphNodeRelation() - # lookup existing nodes for relations + # relationships to cypher for rel in relations.keys(): val = relations[rel] if val != '': @@ -293,9 +306,9 @@ def parse_json_to_cypher(data_list) : #attribute_string = ', '.join([f"{k[1:]}: '{v}'" if "properties_map_arg_mem_flags" not in k else f"{k[1:]}: {v}" for k, v in value_flat.items()]) if subgraph.is_empty_lookup_nodes(): - if subgraph.is_empty_insert_relations(): # no relations or Nodes to add (should not happen) + if subgraph.is_empty_insert_relations(): # no relations to add or Nodes to lookup (should not happen) batchData.append_query(f"""CREATE ({newIdentifier}:{nodeType}:{second_label} ${newIdentifier})""") - else: # no new Nodes happen when all Nodes already matched + else: # all Nodes known no new Lookup batchData.append_query(f"""CREATE ({newIdentifier}:{nodeType}:{second_label} ${newIdentifier}) CREATE {subgraph.get_insert_relations()}""") else: batchData.append_query(f"""MATCH {subgraph.get_lookup_nodes()} CREATE ({newIdentifier}:{nodeType}:{second_label} ${newIdentifier}) CREATE {subgraph.get_insert_relations()}""")